Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use __subquery in Snowflake template for preparation
Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>
  • Loading branch information
hkuepers committed Dec 19, 2024
commit f37b52a6df64f7964ec0c8d533a4e0210906b66f
35 changes: 23 additions & 12 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,22 @@ def _get_entity_df_event_timestamp_range(
),

/*
2. If the `created_timestamp_column` has been set, we need to
2. Use subquery to prepare event_timestamp, created_timestamp, entity columns and feature columns.
*/

"{{ featureview.name }}__subquery" AS (
SELECT
"{{ featureview.timestamp_field }}" as "event_timestamp",
{{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }}
{{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
),

/*
3. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
Otherwise, the ASOF JOIN can have unstable side effects
Expand All @@ -766,33 +781,29 @@ def _get_entity_df_event_timestamp_range(
{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__dedup" AS (
SELECT
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
"event_timestamp",
*,
MAX("created_timestamp") AS "created_timestamp"
FROM {{ featureview.table_subquery }}
FROM "{{ featureview.name }}__subquery"
GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp"
),
{% endif %}

/*
3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
4. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
*/

"{{ featureview.name }}__asof_join" AS (
SELECT
e.*,
{% for feature in featureview.features %}
v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %},
v."{{ featureview.timestamp_field }}"
v.*
FROM "{{ featureview.name }}__entity_dataframe" e
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v
MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}")
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}"{{ featureview.name }}__subquery"{% endif %} v
MATCH_CONDITION (e."entity_timestamp" >= v."event_timestamp")
{% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %}
),

/*
4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
5. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
*/

"{{ featureview.name }}__ttl" AS (
Expand Down