From 4357c22078350a2f0ce4b0fe2b22012c5748875e Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Dec 2025 18:41:56 +0100 Subject: [PATCH 1/7] feat: aggs processing and sinks for cdp --- ...dp_member_segment_aggregates_ds.datasource | 16 +++ ...anization_segment_aggregates_ds.datasource | 17 +++ ...ber_aggregates_bucket_backfiller_sink.pipe | 78 ++++++++++++ ...tes_changed_grandparent_segments_sink.pipe | 57 +++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 30 +++++ ...gregates_changed_parent_segments_sink.pipe | 58 +++++++++ .../cdp_member_segment_aggregates_MV.pipe | 23 ++++ ...r_segment_aggregates_initial_snapshot.pipe | 25 ++++ ...ion_aggregates_bucket_backfiller_sink.pipe | 117 ++++++++++++++++++ ...tes_changed_grandparent_segments_sink.pipe | 60 +++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 32 +++++ ...gregates_changed_parent_segments_sink.pipe | 59 +++++++++ ...dp_organization_segment_aggregates_MV.pipe | 24 ++++ ...n_segment_aggregates_initial_snapshot.pipe | 26 ++++ 14 files changed, 622 insertions(+) create mode 100644 services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource create mode 100644 services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe diff --git a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..bf4729d3f3 --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource @@ -0,0 +1,16 @@ + +SCHEMA > + `segmentId` String, + `memberId` String, + `tenantId` String, + `activityCountState` AggregateFunction(count, String), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activityTypesState` AggregateFunction(groupArrayDistinct, String), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `averageSentimentState` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYear(updatedAt)" +ENGINE_SORTING_KEY "segmentId, memberId" diff --git a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..9c2a5af85b --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource @@ -0,0 +1,17 @@ + +SCHEMA > + `segmentId` String, + `organizationId` String, + `tenantId` String, + `joinedAtState` AggregateFunction(min, DateTime64(3)), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `activityCountState` AggregateFunction(count, String), + `memberCountState` AggregateFunction(countDistinct, String), + `avgContributorEngagement` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYear(updatedAt)" +ENGINE_SORTING_KEY "segmentId, organizationId" diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..a49d64d437 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,78 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds + {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY segmentId, memberId + + + +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY parentId, memberId + + + +NODE grandparent_segment_aggregates +SQL > + + % + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} WHERE cityHash64(grandparentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY grandparentId, memberId + + + +NODE cdp_member_segment_aggs_union +SQL > + + select * from leaf_segment_aggregates + union all + select * from parent_segment_aggregates + union all + select * from grandparent_segment_aggregates + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..4dfce665f0 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,57 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY grandparentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 30 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..5a93af22c1 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,30 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() as updatedAt + FROM cdp_member_segment_aggregates_ds + WHERE + (memberId, segmentId) in ( + select distinct memberId, segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, memberId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..4497968df8 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,58 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY parentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..12cfe83cbd --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe @@ -0,0 +1,23 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(updatedAt) AS lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY + segmentId, + memberId + +TYPE materialized +DATASOURCE cdp_member_segment_aggregates_ds + + diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..f3d5c4acf8 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,25 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY + segmentId, + memberId + +TYPE copy +TARGET_DATASOURCE cdp_member_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..792438bed4 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,117 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, organizationId + + + +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, organizationId + + + +NODE grandparent_segment_aggregates +SQL > + + % + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(grandparentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, organizationId + + + +NODE cdp_organization_segment_aggs_union +SQL > + + select * from leaf_segment_aggregates + union all + select * from parent_segment_aggregates + union all + select * from grandparent_segment_aggregates + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..a7c23753b5 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,60 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where + organizationId <> '' + and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY grandparentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 30 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..dfd2f797a0 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,32 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds + WHERE + organizationId <> '' + AND (organizationId, segmentId) in ( + select distinct organizationId, segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, organizationId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 0 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..133fb970e9 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,59 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY parentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 0 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..a5a2bada28 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe @@ -0,0 +1,24 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(updatedAt) as lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY + segmentId, + organizationId + +TYPE materialized +DATASOURCE cdp_organization_segment_aggregates_ds + + diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..0b9a3a11b1 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,26 @@ +NODE cdp_org_aggregates_sink_initial_snapshot +SQL > + + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY + segmentId, + organizationId + +TYPE copy +TARGET_DATASOURCE cdp_organization_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand + + From f3e7aa15c944c36a370e6ca5a6ae72b183e4165c Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Dec 2025 18:47:02 +0100 Subject: [PATCH 2/7] chore: formatting new resources --- ...dp_member_segment_aggregates_ds.datasource | 7 +- ...anization_segment_aggregates_ds.datasource | 7 +- ...ber_aggregates_bucket_backfiller_sink.pipe | 70 +++++++++++++------ ...tes_changed_grandparent_segments_sink.pipe | 17 ++--- ...aggregates_changed_leaf_segments_sink.pipe | 9 ++- ...gregates_changed_parent_segments_sink.pipe | 17 ++--- .../cdp_member_segment_aggregates_MV.pipe | 9 +-- ...r_segment_aggregates_initial_snapshot.pipe | 9 +-- ...ion_aggregates_bucket_backfiller_sink.pipe | 27 +++---- ...tes_changed_grandparent_segments_sink.pipe | 17 ++--- ...aggregates_changed_leaf_segments_sink.pipe | 9 ++- ...gregates_changed_parent_segments_sink.pipe | 17 ++--- ...dp_organization_segment_aggregates_MV.pipe | 9 +-- ...n_segment_aggregates_initial_snapshot.pipe | 9 +-- 14 files changed, 96 insertions(+), 137 deletions(-) diff --git a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource index bf4729d3f3..3ee3f235e1 100644 --- a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource +++ b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource @@ -1,4 +1,3 @@ - SCHEMA > `segmentId` String, `memberId` String, @@ -11,6 +10,6 @@ SCHEMA > `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), `updatedAt` DateTime64(3) -ENGINE "AggregatingMergeTree" -ENGINE_PARTITION_KEY "toYear(updatedAt)" -ENGINE_SORTING_KEY "segmentId, memberId" +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, memberId diff --git a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource index 9c2a5af85b..9ad5d7671e 100644 --- a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource +++ b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource @@ -1,4 +1,3 @@ - SCHEMA > `segmentId` String, `organizationId` String, @@ -12,6 +11,6 @@ SCHEMA > `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), `updatedAt` DateTime64(3) -ENGINE "AggregatingMergeTree" -ENGINE_PARTITION_KEY "toYear(updatedAt)" -ENGINE_SORTING_KEY "segmentId, organizationId" +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, organizationId diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe index a49d64d437..bb177f85c9 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggregates SQL > - % SELECT segmentId, @@ -11,16 +10,24 @@ SQL > groupArrayDistinctMerge(activityTypesState) AS activityTypes, groupArrayDistinctMerge(activeOnState) AS activeOn, avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt + maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds - {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY segmentId, memberId - - NODE parent_segment_aggregates SQL > - % SELECT parentId as segmentId, @@ -31,17 +38,25 @@ SQL > groupArrayDistinctMerge(activityTypesState) AS activityTypes, groupArrayDistinctMerge(activeOnState) AS activeOn, avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt + maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds as cdp_aggs join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY parentId, memberId - - NODE grandparent_segment_aggregates SQL > - % SELECT grandparentId as segmentId, @@ -55,24 +70,35 @@ SQL > maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds as cdp_aggs join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} WHERE cityHash64(grandparentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(grandparentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY grandparentId, memberId - - NODE cdp_member_segment_aggs_union SQL > - - select * from leaf_segment_aggregates + select * + from leaf_segment_aggregates union all - select * from parent_segment_aggregates + select * + from parent_segment_aggregates union all - select * from grandparent_segment_aggregates + select * + from grandparent_segment_aggregates -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink EXPORT_SCHEDULE @on-demand - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe index 4dfce665f0..709bc09d5a 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE members_with_changed_aggs_previous_day SQL > - select distinct memberId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE grandparent_segment_aggs_updated_previous_day SQL > - SELECT grandparentId as segmentId, memberId, @@ -48,10 +39,10 @@ SQL > and memberId in (select memberId from members_with_changed_aggs_previous_day) GROUP BY grandparentId, memberId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 30 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe index 5a93af22c1..5eaa3a7ae1 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggs_updated_previous_day SQL > - % SELECT segmentId, @@ -21,10 +20,10 @@ SQL > ) GROUP BY segmentId, memberId, updatedAt -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 0 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe index 4497968df8..6e3ff065c2 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE members_with_changed_aggs_previous_day SQL > - select distinct memberId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE cdp_member_aggregates_sink_daily_parent_segments_1 SQL > - % SELECT parentId as segmentId, @@ -49,10 +40,10 @@ SQL > and memberId in (select memberId from members_with_changed_aggs_previous_day) GROUP BY parentId, memberId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 0 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe index 12cfe83cbd..3a71264cb6 100644 --- a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, memberId, @@ -13,11 +12,7 @@ SQL > maxState(updatedAt) AS lastActivityUpdatedAtState, now64(3) as updatedAt FROM activityRelations_enrich_snapshot_MV_ds - GROUP BY - segmentId, - memberId + GROUP BY segmentId, memberId -TYPE materialized +TYPE MATERIALIZED DATASOURCE cdp_member_segment_aggregates_ds - - diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe index f3d5c4acf8..b6a69804fd 100644 --- a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, memberId, @@ -13,13 +12,9 @@ SQL > maxState(act.updatedAt) as lastActivityUpdatedAtState, max(act.updatedAt) as updatedAt FROM activityRelations_enriched_deduplicated_ds act - GROUP BY - segmentId, - memberId + GROUP BY segmentId, memberId -TYPE copy +TYPE COPY TARGET_DATASOURCE cdp_member_segment_aggregates_ds COPY_MODE replace COPY_SCHEDULE @on-demand - - diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe index 792438bed4..710e2c56b5 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggregates SQL > - % SELECT segmentId, @@ -29,11 +28,8 @@ SQL > {% end %} GROUP BY segmentId, organizationId - - NODE parent_segment_aggregates SQL > - % SELECT parentId as segmentId, @@ -63,11 +59,8 @@ SQL > {% end %} GROUP BY parentId, organizationId - - NODE grandparent_segment_aggregates SQL > - % SELECT grandparentId as segmentId, @@ -97,21 +90,21 @@ SQL > {% end %} GROUP BY grandparentId, organizationId - - NODE cdp_organization_segment_aggs_union SQL > - - select * from leaf_segment_aggregates + select * + from leaf_segment_aggregates union all - select * from parent_segment_aggregates + select * + from parent_segment_aggregates union all - select * from grandparent_segment_aggregates + select * + from grandparent_segment_aggregates -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE @on-demand - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe index a7c23753b5..e8f8b77256 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -1,17 +1,13 @@ NODE organizations_with_changed_aggs_previous_day SQL > - select distinct organizationId from cdp_organization_segment_aggregates_ds where organizationId <> '' and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -23,16 +19,11 @@ SQL > select distinct segmentId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE grandparent_segment_aggs_updated_previous_day SQL > - SELECT grandparentId as segmentId, organizationId, @@ -51,10 +42,10 @@ SQL > and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) GROUP BY grandparentId, organizationId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 30 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe index dfd2f797a0..e39d226927 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggs_updated_previous_day SQL > - % SELECT segmentId, @@ -23,10 +22,10 @@ SQL > ) GROUP BY segmentId, organizationId, updatedAt -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 0 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe index 133fb970e9..bd8cb4dc36 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE organizations_with_changed_aggs_previous_day SQL > - select distinct organizationId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE cdp_member_aggregates_sink_daily_parent_segments_1 SQL > - % SELECT parentId as segmentId, @@ -50,10 +41,10 @@ SQL > and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) GROUP BY parentId, organizationId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 0 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe index a5a2bada28..93a5a3e4b5 100644 --- a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, organizationId, @@ -14,11 +13,7 @@ SQL > maxState(updatedAt) as lastActivityUpdatedAtState, now64(3) as updatedAt FROM activityRelations_enrich_snapshot_MV_ds - GROUP BY - segmentId, - organizationId + GROUP BY segmentId, organizationId -TYPE materialized +TYPE MATERIALIZED DATASOURCE cdp_organization_segment_aggregates_ds - - diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe index 0b9a3a11b1..e1e84adaed 100644 --- a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe @@ -1,6 +1,5 @@ NODE cdp_org_aggregates_sink_initial_snapshot SQL > - SELECT segmentId, organizationId, @@ -14,13 +13,9 @@ SQL > maxState(act.updatedAt) as lastActivityUpdatedAtState, max(act.updatedAt) as updatedAt FROM activityRelations_enriched_deduplicated_ds act - GROUP BY - segmentId, - organizationId + GROUP BY segmentId, organizationId -TYPE copy +TYPE COPY TARGET_DATASOURCE cdp_organization_segment_aggregates_ds COPY_MODE replace COPY_SCHEDULE @on-demand - - From ebda0c04eeac0834a28c344277cb069bceb52886 Mon Sep 17 00:00:00 2001 From: anil Date: Wed, 28 Jan 2026 00:04:50 +0100 Subject: [PATCH 3/7] fix: member sinks splitted into different pipes for each type of segment --- ...tes_changed_grandparent_segments_sink.pipe | 57 ++++++++++++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 30 ++++++++++ ...gregates_changed_parent_segments_sink.pipe | 58 +++++++++++++++++++ ...arent_segments_bucket_backfiller_sink.pipe | 37 ++++++++++++ ..._leaf_segments_bucket_backfiller_sink.pipe | 36 ++++++++++++ ...arent_segments_bucket_backfiller_sink.pipe | 37 ++++++++++++ ...arent_segments_bucket_backfiller_sink.pipe | 36 ++++++++++++ ..._leaf_segments_bucket_backfiller_sink.pipe | 35 +++++++++++ ...arent_segments_bucket_backfiller_sink.pipe | 36 ++++++++++++ 9 files changed, 362 insertions(+) create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..4dfce665f0 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,57 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY grandparentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 30 1 * * * + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..5a93af22c1 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,30 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() as updatedAt + FROM cdp_member_segment_aggregates_ds + WHERE + (memberId, segmentId) in ( + select distinct memberId, segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, memberId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..4497968df8 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,58 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY parentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..24d8ea33a0 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,37 @@ +NODE grandparent_segment_aggs +SQL > + + % + SELECT + grandparentId as segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(grandparentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..93a7027f63 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,36 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + cdp_aggs.segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..6b1c49fe5d --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,37 @@ +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(parentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..5380d2ba92 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,36 @@ +NODE grandparent_segment_aggs +SQL > + % + SELECT + grandparentId as segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(grandparentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, memberId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE @on-demand +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..632f6d1b9b --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,35 @@ +NODE leaf_segment_aggregates +SQL > + % + SELECT + cdp_aggs.segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, memberId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE @on-demand +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..b180e759fe --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe @@ -0,0 +1,36 @@ +NODE parent_segment_aggregates +SQL > + % + SELECT + parentId as segmentId, + cdp_aggs.memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(cdp_aggs.activityCountState) AS activityCount, + maxMerge(cdp_aggs.lastActiveState) AS lastActive, + groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, + groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, + avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(parentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, memberId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE @on-demand +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink From 99ad5accd6b549f3ffaff8fa737861e232198a9d Mon Sep 17 00:00:00 2001 From: anil Date: Wed, 28 Jan 2026 00:05:33 +0100 Subject: [PATCH 4/7] fix: org sinks for changed data splitted for segment types --- ...ion_aggregates_bucket_backfiller_sink.pipe | 117 ++++++++++++++++++ ...tes_changed_grandparent_segments_sink.pipe | 60 +++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 32 +++++ ...gregates_changed_parent_segments_sink.pipe | 61 +++++++++ ...ion_aggregates_bucket_backfiller_sink.pipe | 6 +- ...gregates_changed_parent_segments_sink.pipe | 6 +- 6 files changed, 277 insertions(+), 5 deletions(-) create mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe create mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..3f538d9ab3 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,117 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, organizationId + + + +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(parentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, organizationId + + + +NODE grandparent_segment_aggregates +SQL > + + % + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(grandparentId) % 5 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, organizationId + + + +NODE cdp_organization_segment_aggs_union +SQL > + + select * from leaf_segment_aggregates + union all + select * from parent_segment_aggregates + union all + select * from grandparent_segment_aggregates + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..a7c23753b5 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,60 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where + organizationId <> '' + and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY grandparentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 30 2 * * * + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..dfd2f797a0 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,32 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds + WHERE + organizationId <> '' + AND (organizationId, segmentId) in ( + select distinct organizationId, segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, organizationId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 0 2 * * * + + diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..4cc0be2ce1 --- /dev/null +++ b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,61 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where + organizationId <> '' + AND updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY parentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 30 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe index 710e2c56b5..e0e8351255 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -16,7 +16,7 @@ SQL > {% if defined(bucket_id) %} WHERE organizationId <> '' - AND cityHash64(segmentId) % 3 + AND cityHash64(segmentId) % 5 = {{ UInt8( bucket_id, @@ -47,7 +47,7 @@ SQL > {% if defined(bucket_id) %} WHERE organizationId <> '' - AND cityHash64(segmentId) % 3 + AND cityHash64(parentId) % 5 = {{ UInt8( bucket_id, @@ -78,7 +78,7 @@ SQL > {% if defined(bucket_id) %} WHERE organizationId <> '' - AND cityHash64(grandparentId) % 3 + AND cityHash64(grandparentId) % 5 = {{ UInt8( bucket_id, diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe index bd8cb4dc36..3627793ad5 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -2,7 +2,9 @@ NODE organizations_with_changed_aggs_previous_day SQL > select distinct organizationId from cdp_organization_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + where + organizationId <> '' + AND updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) NODE segments_with_changed_aggs_previous_day SQL > @@ -44,7 +46,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE 0 2 * * * +EXPORT_SCHEDULE 30 2 * * * EXPORT_FORMAT csv EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink From a075de127cda7468a48d2e12f923a364cd138592 Mon Sep 17 00:00:00 2001 From: anil Date: Wed, 28 Jan 2026 00:09:54 +0100 Subject: [PATCH 5/7] feat: improved format script with wildcard matching and doc --- services/libs/tinybird/scripts/format.sh | 85 ++++++++++++++++++++ services/libs/tinybird/scripts/format_all.sh | 27 ------- 2 files changed, 85 insertions(+), 27 deletions(-) create mode 100755 services/libs/tinybird/scripts/format.sh delete mode 100755 services/libs/tinybird/scripts/format_all.sh diff --git a/services/libs/tinybird/scripts/format.sh b/services/libs/tinybird/scripts/format.sh new file mode 100755 index 0000000000..cac577f95d --- /dev/null +++ b/services/libs/tinybird/scripts/format.sh @@ -0,0 +1,85 @@ +#!/bin/bash + +PIPES_FOLDER="../pipes" +DATA_SOURCES_FOLDER="../datasources" + +show_help() { + cat << EOF +Usage: format_all.sh [OPTIONS] + +Format Tinybird files in pipes and datasources folders. + +OPTIONS: + --help Show this help message and exit + --sequential Run formatting sequentially instead of in parallel + --match PATTERN Match files containing PATTERN in their name (wildcard match) + +EXAMPLES: + format_all.sh + Format all files in parallel + + format_all.sh --match cdp_member + Format only files containing "cdp_member" in their name + + format_all.sh --match cdp_organization --sequential + Format files containing "cdp_organization" sequentially + +EOF + exit 0 +} + +# Parse command line arguments +SEQUENTIAL=false +MATCH_PATTERN="" + +while [[ $# -gt 0 ]]; do + case $1 in + --help|-h) + show_help + ;; + --sequential) + SEQUENTIAL=true + shift + ;; + --match) + if [ -z "$2" ] || [[ "$2" == --* ]]; then + echo "Error: --match requires a pattern argument" + echo "" + show_help + fi + MATCH_PATTERN="$2" + shift 2 + ;; + *) + echo "Error: Unknown option: $1" + echo "" + show_help + ;; + esac +done + +format_files_in_folder() { + local folder="$1" + for file in "$folder"/*; do + if [ -f "$file" ]; then + local basename=$(basename "$file") + + # Skip file if pattern is set and doesn't match + if [ -n "$MATCH_PATTERN" ] && [[ ! "$basename" == *$MATCH_PATTERN* ]]; then + continue + fi + + if [ "$SEQUENTIAL" = true ]; then + tb fmt --yes "$file" + else + tb fmt --yes "$file" & + fi + fi + done +} + +format_files_in_folder "$PIPES_FOLDER" +format_files_in_folder "$DATA_SOURCES_FOLDER" + +# Only wait for background processes in parallel mode +[ "$SEQUENTIAL" = false ] && wait diff --git a/services/libs/tinybird/scripts/format_all.sh b/services/libs/tinybird/scripts/format_all.sh deleted file mode 100755 index 186ef9ef0a..0000000000 --- a/services/libs/tinybird/scripts/format_all.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash - -PIPES_FOLDER="../pipes" -DATA_SOURCES_FOLDER="../datasources" - -# Parse command line arguments -SEQUENTIAL=false -[[ "$1" == "--sequential" ]] && SEQUENTIAL=true - -format_files_in_folder() { - local folder="$1" - for file in "$folder"/*; do - if [ -f "$file" ]; then - if [ "$SEQUENTIAL" = true ]; then - tb fmt --yes "$file" - else - tb fmt --yes "$file" & - fi - fi - done -} - -format_files_in_folder "$PIPES_FOLDER" -format_files_in_folder "$DATA_SOURCES_FOLDER" - -# Only wait for background processes in parallel mode -[ "$SEQUENTIAL" = false ] && wait From 0b4c11f51a7de9d0de2872dfc1c425bcd8420984 Mon Sep 17 00:00:00 2001 From: anil Date: Wed, 28 Jan 2026 00:16:49 +0100 Subject: [PATCH 6/7] chore: remove unused pipe definition --- ...ber_aggregates_bucket_backfiller_sink.pipe | 104 ------------------ 1 file changed, 104 deletions(-) delete mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe deleted file mode 100644 index bb177f85c9..0000000000 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe +++ /dev/null @@ -1,104 +0,0 @@ -NODE leaf_segment_aggregates -SQL > - % - SELECT - segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt - FROM cdp_member_segment_aggregates_ds - {% if defined(bucket_id) %} - WHERE - cityHash64(segmentId) % 10 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY segmentId, memberId - -NODE parent_segment_aggregates -SQL > - % - SELECT - parentId as segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt - FROM cdp_member_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - cityHash64(segmentId) % 10 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY parentId, memberId - -NODE grandparent_segment_aggregates -SQL > - % - SELECT - grandparentId as segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt - FROM cdp_member_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - cityHash64(grandparentId) % 10 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY grandparentId, memberId - -NODE cdp_member_segment_aggs_union -SQL > - select * - from leaf_segment_aggregates - union all - select * - from parent_segment_aggregates - union all - select * - from grandparent_segment_aggregates - -TYPE SINK -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE @on-demand -EXPORT_FORMAT csv -EXPORT_STRATEGY @new -EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink From a3087580e717b6c1cba3c4f78a1c7c0906d295ac Mon Sep 17 00:00:00 2001 From: anil Date: Wed, 28 Jan 2026 00:26:54 +0100 Subject: [PATCH 7/7] chore: remove wrong pathed commited files --- ...tes_changed_grandparent_segments_sink.pipe | 57 --------- ...aggregates_changed_leaf_segments_sink.pipe | 30 ----- ...gregates_changed_parent_segments_sink.pipe | 58 --------- ...arent_segments_bucket_backfiller_sink.pipe | 37 ------ ..._leaf_segments_bucket_backfiller_sink.pipe | 36 ------ ...arent_segments_bucket_backfiller_sink.pipe | 37 ------ ...ion_aggregates_bucket_backfiller_sink.pipe | 117 ------------------ ...tes_changed_grandparent_segments_sink.pipe | 60 --------- ...aggregates_changed_leaf_segments_sink.pipe | 32 ----- ...gregates_changed_parent_segments_sink.pipe | 61 --------- 10 files changed, 525 deletions(-) delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe delete mode 100644 scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe deleted file mode 100644 index 4dfce665f0..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe +++ /dev/null @@ -1,57 +0,0 @@ -NODE members_with_changed_aggs_previous_day -SQL > - - select distinct memberId - from cdp_member_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - - -NODE segments_with_changed_aggs_previous_day -SQL > - - select id as segmentId - from segments - where - grandparentId in ( - select grandparentId - from segments - where - id in ( - select distinct segmentId - from cdp_member_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - ) - - ) - - - -NODE grandparent_segment_aggs_updated_previous_day -SQL > - - SELECT - grandparentId as segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - now() AS updatedAt - FROM cdp_member_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - where - cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) - and memberId in (select memberId from members_with_changed_aggs_previous_day) - GROUP BY grandparentId, memberId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE 30 1 * * * - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe deleted file mode 100644 index 5a93af22c1..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe +++ /dev/null @@ -1,30 +0,0 @@ -NODE leaf_segment_aggs_updated_previous_day -SQL > - - % - SELECT - segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - now() as updatedAt - FROM cdp_member_segment_aggregates_ds - WHERE - (memberId, segmentId) in ( - select distinct memberId, segmentId - from cdp_member_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - GROUP BY segmentId, memberId, updatedAt - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE 0 1 * * * - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe deleted file mode 100644 index 4497968df8..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe +++ /dev/null @@ -1,58 +0,0 @@ -NODE members_with_changed_aggs_previous_day -SQL > - - select distinct memberId - from cdp_member_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - - -NODE segments_with_changed_aggs_previous_day -SQL > - - select id as segmentId - from segments - where - parentId in ( - select parentId - from segments - where - id in ( - select distinct segmentId - from cdp_member_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - ) - - ) - - - -NODE cdp_member_aggregates_sink_daily_parent_segments_1 -SQL > - - % - SELECT - parentId as segmentId, - memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(activityCountState) AS activityCount, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activityTypesState) AS activityTypes, - groupArrayDistinctMerge(activeOnState) AS activeOn, - avgMerge(averageSentimentState) AS averageSentiment, - now() AS updatedAt - FROM cdp_member_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - where - cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) - and memberId in (select memberId from members_with_changed_aggs_previous_day) - GROUP BY parentId, memberId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE 0 1 * * * - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe deleted file mode 100644 index 24d8ea33a0..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_grandparent_segments_bucket_backfiller_sink.pipe +++ /dev/null @@ -1,37 +0,0 @@ -NODE grandparent_segment_aggs -SQL > - - % - SELECT - grandparentId as segmentId, - cdp_aggs.memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(cdp_aggs.activityCountState) AS activityCount, - maxMerge(cdp_aggs.lastActiveState) AS lastActive, - groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, - groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, - avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, - now() AS updatedAt - FROM cdp_member_segment_aggregates_ds cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - cityHash64(grandparentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY grandparentId, memberId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE @on-demand - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe deleted file mode 100644 index 93a7027f63..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_leaf_segments_bucket_backfiller_sink.pipe +++ /dev/null @@ -1,36 +0,0 @@ -NODE leaf_segment_aggregates -SQL > - - % - SELECT - cdp_aggs.segmentId, - cdp_aggs.memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(cdp_aggs.activityCountState) AS activityCount, - maxMerge(cdp_aggs.lastActiveState) AS lastActive, - groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, - groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, - avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, - now() AS updatedAt - FROM cdp_member_segment_aggregates_ds cdp_aggs - {% if defined(bucket_id) %} - WHERE - cityHash64(segmentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY segmentId, memberId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE @on-demand - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe deleted file mode 100644 index 6b1c49fe5d..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_member_aggregates_parent_segments_bucket_backfiller_sink.pipe +++ /dev/null @@ -1,37 +0,0 @@ -NODE parent_segment_aggregates -SQL > - - % - SELECT - parentId as segmentId, - cdp_aggs.memberId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - countMerge(cdp_aggs.activityCountState) AS activityCount, - maxMerge(cdp_aggs.lastActiveState) AS lastActive, - groupArrayDistinctMerge(cdp_aggs.activityTypesState) AS activityTypes, - groupArrayDistinctMerge(cdp_aggs.activeOnState) AS activeOn, - avgMerge(cdp_aggs.averageSentimentState) AS averageSentiment, - now() AS updatedAt - FROM cdp_member_segment_aggregates_ds cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - cityHash64(parentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY parentId, memberId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink -EXPORT_SCHEDULE @on-demand - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe deleted file mode 100644 index 3f538d9ab3..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe +++ /dev/null @@ -1,117 +0,0 @@ -NODE leaf_segment_aggregates -SQL > - - % - SELECT - segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, - max(updatedAt) AS updatedAt - FROM cdp_organization_segment_aggregates_ds - {% if defined(bucket_id) %} - WHERE - organizationId <> '' - AND cityHash64(segmentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY segmentId, organizationId - - - -NODE parent_segment_aggregates -SQL > - - % - SELECT - parentId as segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, - max(updatedAt) AS updatedAt - FROM cdp_organization_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - organizationId <> '' - AND cityHash64(parentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY parentId, organizationId - - - -NODE grandparent_segment_aggregates -SQL > - - % - SELECT - grandparentId as segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, - max(updatedAt) AS updatedAt - FROM cdp_organization_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} - WHERE - organizationId <> '' - AND cityHash64(grandparentId) % 5 - = {{ - UInt8( - bucket_id, - 0, - description="This is bucket id of the activity segment", - required=False, - ) - }} - {% end %} - GROUP BY grandparentId, organizationId - - - -NODE cdp_organization_segment_aggs_union -SQL > - - select * from leaf_segment_aggregates - union all - select * from parent_segment_aggregates - union all - select * from grandparent_segment_aggregates - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink -EXPORT_SCHEDULE @on-demand - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe deleted file mode 100644 index a7c23753b5..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe +++ /dev/null @@ -1,60 +0,0 @@ -NODE organizations_with_changed_aggs_previous_day -SQL > - - select distinct organizationId - from cdp_organization_segment_aggregates_ds - where - organizationId <> '' - and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - - -NODE segments_with_changed_aggs_previous_day -SQL > - - select id as segmentId - from segments - where - grandparentId in ( - select grandparentId - from segments - where - id in ( - select distinct segmentId - from cdp_organization_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - ) - - ) - - - -NODE grandparent_segment_aggs_updated_previous_day -SQL > - - SELECT - grandparentId as segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - avgMerge(avgContributorEngagement) AS avgContributorEngagement, - now() as updatedAt - FROM cdp_organization_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - where - cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) - and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) - GROUP BY grandparentId, organizationId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink -EXPORT_SCHEDULE 30 2 * * * - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe deleted file mode 100644 index dfd2f797a0..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe +++ /dev/null @@ -1,32 +0,0 @@ -NODE leaf_segment_aggs_updated_previous_day -SQL > - - % - SELECT - segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - avgMerge(avgContributorEngagement) AS avgContributorEngagement, - now() as updatedAt - FROM cdp_organization_segment_aggregates_ds - WHERE - organizationId <> '' - AND (organizationId, segmentId) in ( - select distinct organizationId, segmentId - from cdp_organization_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - GROUP BY segmentId, organizationId, updatedAt - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink -EXPORT_SCHEDULE 0 2 * * * - - diff --git a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe deleted file mode 100644 index 4cc0be2ce1..0000000000 --- a/scripts/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe +++ /dev/null @@ -1,61 +0,0 @@ -NODE organizations_with_changed_aggs_previous_day -SQL > - - select distinct organizationId - from cdp_organization_segment_aggregates_ds - where - organizationId <> '' - AND updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - - -NODE segments_with_changed_aggs_previous_day -SQL > - - select id as segmentId - from segments - where - parentId in ( - select parentId - from segments - where - id in ( - select distinct segmentId - from cdp_organization_segment_aggregates_ds - where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - ) - - ) - - - -NODE cdp_member_aggregates_sink_daily_parent_segments_1 -SQL > - - % - SELECT - parentId as segmentId, - organizationId, - '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, - minMerge(joinedAtState) as joinedAt, - maxMerge(lastActiveState) AS lastActive, - groupArrayDistinctMerge(activeOnState) AS activeOn, - countMerge(activityCountState) AS activityCount, - countDistinctMerge(memberCountState) as memberCount, - avgMerge(avgContributorEngagement) AS avgContributorEngagement, - now() as updatedAt - FROM cdp_organization_segment_aggregates_ds as cdp_aggs - join segments s on s.id = cdp_aggs.segmentId - where - cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) - and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) - GROUP BY parentId, organizationId - -TYPE sink -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink -EXPORT_SCHEDULE 30 2 * * * - -