Skip to content

Commit da7b74a

Browse files
author
hkuepers
committed
Use ASOF JOIN in Snowflake offline store query
Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>
1 parent 88a92cf commit da7b74a

File tree

1 file changed

+35
-94
lines changed

1 file changed

+35
-94
lines changed

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 35 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(
716716

717717
MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
718718
/*
719-
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720-
all the logic as the field to GROUP BY the data
719+
0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720+
all the logic as the field to GROUP BY the data.
721721
*/
722722
WITH "entity_dataframe" AS (
723723
SELECT *,
@@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(
739739
740740
{% for featureview in featureviews %}
741741
742+
/*
743+
1. Only select the required columns with entities of the featureview.
744+
*/
745+
742746
"{{ featureview.name }}__entity_dataframe" AS (
743747
SELECT
744748
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
@@ -751,120 +755,57 @@ def _get_entity_df_event_timestamp_range(
751755
"{{featureview.name}}__entity_row_unique_id"
752756
),
753757
754-
/*
755-
This query template performs the point-in-time correctness join for a single feature set table
756-
to the provided entity table.
757-
758-
1. We first join the current feature_view to the entity dataframe that has been passed.
759-
This JOIN has the following logic:
760-
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
761-
is less than the one provided in the entity dataframe
762-
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
763-
is higher the the one provided minus the TTL
764-
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
765-
computed previously
766-
767-
The output of this CTE will contain all the necessary information and already filtered out most
768-
of the data that is not relevant.
769-
*/
770-
771-
"{{ featureview.name }}__subquery" AS (
772-
SELECT
773-
"{{ featureview.timestamp_field }}" as "event_timestamp",
774-
{{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }}
775-
{{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
776-
{% for feature in featureview.features %}
777-
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
778-
{% endfor %}
779-
FROM {{ featureview.table_subquery }}
780-
WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
781-
{% if featureview.ttl == 0 %}{% else %}
782-
AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
783-
{% endif %}
784-
),
785-
786-
"{{ featureview.name }}__base" AS (
787-
SELECT
788-
"subquery".*,
789-
"entity_dataframe"."entity_timestamp",
790-
"entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
791-
FROM "{{ featureview.name }}__subquery" AS "subquery"
792-
INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
793-
ON TRUE
794-
AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"
795-
796-
{% if featureview.ttl == 0 %}{% else %}
797-
AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
798-
{% endif %}
799-
800-
{% for entity in featureview.entities %}
801-
AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
802-
{% endfor %}
803-
),
804-
805758
/*
806759
2. If the `created_timestamp_column` has been set, we need to
807760
deduplicate the data first. This is done by calculating the
808761
`MAX(created_at_timestamp)` for each event_timestamp.
809-
We then join the data on the next CTE
762+
Otherwise, the ASOF JOIN can have unstable side effects
763+
https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
810764
*/
765+
811766
{% if featureview.created_timestamp_column %}
812767
"{{ featureview.name }}__dedup" AS (
813768
SELECT
814-
"{{featureview.name}}__entity_row_unique_id",
769+
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %},
815770
"event_timestamp",
816771
MAX("created_timestamp") AS "created_timestamp"
817-
FROM "{{ featureview.name }}__base"
818-
GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp"
772+
FROM "{{ featureview.table_subquery }}"
773+
GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp"
819774
),
820775
{% endif %}
821776
822777
/*
823-
3. The data has been filtered during the first CTE "*__base"
824-
Thus we only need to compute the latest timestamp of each feature.
778+
3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
825779
*/
826-
"{{ featureview.name }}__latest" AS (
780+
781+
"{{ featureview.name }}__asof_join" AS (
827782
SELECT
828-
"event_timestamp",
829-
{% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
830-
"{{featureview.name}}__entity_row_unique_id"
831-
FROM
832-
(
833-
SELECT *,
834-
ROW_NUMBER() OVER(
835-
PARTITION BY "{{featureview.name}}__entity_row_unique_id"
836-
ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
837-
) AS "row_number"
838-
FROM "{{ featureview.name }}__base"
839-
{% if featureview.created_timestamp_column %}
840-
INNER JOIN "{{ featureview.name }}__dedup"
841-
USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
842-
{% endif %}
843-
)
844-
WHERE "row_number" = 1
783+
e.*,
784+
{% for feature in featureview.features %}
785+
v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
786+
{% endfor %},
787+
v."{{ featureview.timestamp_field }}"
788+
FROM "{{ featureview.name }}__entity_dataframe" e
789+
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v
790+
MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}")
791+
USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %})
845792
),
846793
847794
/*
848-
4. Once we know the latest value of each feature for a given timestamp,
849-
we can join again the data back to the original "base" dataset
795+
4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
850796
*/
851-
"{{ featureview.name }}__cleaned" AS (
852-
SELECT "base".*
853-
FROM "{{ featureview.name }}__base" AS "base"
854-
INNER JOIN "{{ featureview.name }}__latest"
855-
USING(
856-
"{{featureview.name}}__entity_row_unique_id",
857-
"event_timestamp"
858-
{% if featureview.created_timestamp_column %}
859-
,"created_timestamp"
860-
{% endif %}
861-
)
862-
){% if loop.last %}{% else %}, {% endif %}
863797
798+
"{{ featureview.name }}__ttl" AS (
799+
SELECT *
800+
FROM "{{ featureview.name }}__asof_join"
801+
{% if featureview.ttl == 0 %}{% else %}
802+
WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
803+
{% endif %}
804+
){% if loop.last %}{% else %}, {% endif %}
864805
865806
{% endfor %}
866807
/*
867-
Joins the outputs of multiple time travel joins to a single table.
808+
Join the outputs of multiple time travel joins to a single table.
868809
The entity_dataframe dataset being our source of truth here.
869810
*/
870811
@@ -877,7 +818,7 @@ def _get_entity_df_event_timestamp_range(
877818
{% for feature in featureview.features %}
878819
,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
879820
{% endfor %}
880-
FROM "{{ featureview.name }}__cleaned"
881-
) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id")
821+
FROM "{{ featureview.name }}__ttl"
822+
) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id")
882823
{% endfor %}
883824
"""

0 commit comments

Comments
 (0)