@@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(
716716
717717MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
718718/*
719- Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720- all the logic as the field to GROUP BY the data
719+ 0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720+ all the logic as the field to GROUP BY the data.
721721*/
722722WITH "entity_dataframe" AS (
723723 SELECT *,
@@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(
739739
740740{% for featureview in featureviews %}
741741
742+ /*
743+ 1. Only select the required columns with entities of the featureview.
744+ */
745+
742746"{{ featureview.name }}__entity_dataframe" AS (
743747 SELECT
744748 {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
@@ -751,120 +755,57 @@ def _get_entity_df_event_timestamp_range(
751755 "{{featureview.name}}__entity_row_unique_id"
752756),
753757
754- /*
755- This query template performs the point-in-time correctness join for a single feature set table
756- to the provided entity table.
757-
758- 1. We first join the current feature_view to the entity dataframe that has been passed.
759- This JOIN has the following logic:
760- - For each row of the entity dataframe, only keep the rows where the `timestamp_field`
761- is less than the one provided in the entity dataframe
762- - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
763- is higher the the one provided minus the TTL
764- - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
765- computed previously
766-
767- The output of this CTE will contain all the necessary information and already filtered out most
768- of the data that is not relevant.
769- */
770-
771- "{{ featureview.name }}__subquery" AS (
772- SELECT
773- "{{ featureview.timestamp_field }}" as "event_timestamp",
774- {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }}
775- {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
776- {% for feature in featureview.features %}
777- "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
778- {% endfor %}
779- FROM {{ featureview.table_subquery }}
780- WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
781- {% if featureview.ttl == 0 %}{% else %}
782- AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
783- {% endif %}
784- ),
785-
786- "{{ featureview.name }}__base" AS (
787- SELECT
788- "subquery".*,
789- "entity_dataframe"."entity_timestamp",
790- "entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
791- FROM "{{ featureview.name }}__subquery" AS "subquery"
792- INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
793- ON TRUE
794- AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"
795-
796- {% if featureview.ttl == 0 %}{% else %}
797- AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
798- {% endif %}
799-
800- {% for entity in featureview.entities %}
801- AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
802- {% endfor %}
803- ),
804-
805758/*
806759 2. If the `created_timestamp_column` has been set, we need to
807760 deduplicate the data first. This is done by calculating the
808761 `MAX(created_at_timestamp)` for each event_timestamp.
809- We then join the data on the next CTE
762+ Otherwise, the ASOF JOIN can have unstable side effects
763+ https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
810764*/
765+
811766{% if featureview.created_timestamp_column %}
812767"{{ featureview.name }}__dedup" AS (
813768 SELECT
814- "{{ featureview.name}}__entity_row_unique_id" ,
769+ {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} ,
815770 "event_timestamp",
816771 MAX("created_timestamp") AS "created_timestamp"
817- FROM "{{ featureview.name }}__base "
818- GROUP BY "{{ featureview.name}}__entity_row_unique_id" , "event_timestamp"
772+ FROM "{{ featureview.table_subquery }}"
773+ GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} , "event_timestamp"
819774),
820775{% endif %}
821776
822777/*
823- 3. The data has been filtered during the first CTE "*__base"
824- Thus we only need to compute the latest timestamp of each feature.
778+ 3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
825779*/
826- "{{ featureview.name }}__latest" AS (
780+
781+ "{{ featureview.name }}__asof_join" AS (
827782 SELECT
828- "event_timestamp",
829- {% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
830- "{{featureview.name}}__entity_row_unique_id"
831- FROM
832- (
833- SELECT *,
834- ROW_NUMBER() OVER(
835- PARTITION BY "{{featureview.name}}__entity_row_unique_id"
836- ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
837- ) AS "row_number"
838- FROM "{{ featureview.name }}__base"
839- {% if featureview.created_timestamp_column %}
840- INNER JOIN "{{ featureview.name }}__dedup"
841- USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
842- {% endif %}
843- )
844- WHERE "row_number" = 1
783+ e.*,
784+ {% for feature in featureview.features %}
785+ v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
786+ {% endfor %},
787+ v."{{ featureview.timestamp_field }}"
788+ FROM "{{ featureview.name }}__entity_dataframe" e
789+ ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v
790+ MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}")
791+ USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %})
845792),
846793
847794/*
848- 4. Once we know the latest value of each feature for a given timestamp,
849- we can join again the data back to the original "base" dataset
795+ 4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
850796*/
851- "{{ featureview.name }}__cleaned" AS (
852- SELECT "base".*
853- FROM "{{ featureview.name }}__base" AS "base"
854- INNER JOIN "{{ featureview.name }}__latest"
855- USING(
856- "{{featureview.name}}__entity_row_unique_id",
857- "event_timestamp"
858- {% if featureview.created_timestamp_column %}
859- ,"created_timestamp"
860- {% endif %}
861- )
862- ){% if loop.last %}{% else %}, {% endif %}
863797
798+ "{{ featureview.name }}__ttl" AS (
799+ SELECT *
800+ FROM "{{ featureview.name }}__asof_join"
801+ {% if featureview.ttl == 0 %}{% else %}
802+ WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
803+ {% endif %}
804+ ){% if loop.last %}{% else %}, {% endif %}
864805
865806{% endfor %}
866807/*
867- Joins the outputs of multiple time travel joins to a single table.
808+ Join the outputs of multiple time travel joins to a single table.
868809 The entity_dataframe dataset being our source of truth here.
869810 */
870811
@@ -877,7 +818,7 @@ def _get_entity_df_event_timestamp_range(
877818 {% for feature in featureview.features %}
878819 ,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
879820 {% endfor %}
880- FROM "{{ featureview.name }}__cleaned "
881- ) "{{ featureview.name }}__cleaned " USING ("{{featureview.name}}__entity_row_unique_id")
821+ FROM "{{ featureview.name }}__ttl "
822+ ) "{{ featureview.name }}__ttl " USING ("{{featureview.name}}__entity_row_unique_id")
882823{% endfor %}
883824"""
0 commit comments