-
Notifications
You must be signed in to change notification settings - Fork 1.3k
perf(postgres): Optimizing feast offline Store for date-range multi-FV retrieval #6057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
309cdac
3da5af3
34eed9a
a4139b7
74a474c
6f754b8
7594205
f770c41
f235eb8
8c66e6f
25bc86d
b6a4703
d014b09
69565bb
b5d7c8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -639,103 +639,97 @@ def _get_entity_schema( | |
| {% endfor %} | ||
| ), | ||
|
|
||
| {# --- CTE 1: Stack spine + feature rows via UNION ALL --- #} | ||
| stacked AS ( | ||
| {# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #} | ||
| {# --- Per-FV independent LOCF forward-fill pipelines --- #} | ||
| {# Each FV is stacked, grouped, and filled independently to prevent #} | ||
| {# cross-FV interference when multiple FVs share overlapping timestamps. #} | ||
| {% for featureview in featureviews %} | ||
| "{{ featureview.name }}__stacked" AS ( | ||
| SELECT | ||
| event_timestamp, | ||
| s.event_timestamp, | ||
| {% for entity in all_entities %} | ||
| "{{ entity }}", | ||
| s."{{ entity }}", | ||
| {% endfor %} | ||
| 1 AS is_spine, | ||
| NULL::int AS feature_anchor | ||
| {% for feat in all_feature_cols %} | ||
| , NULL AS "{{ feat }}" | ||
| {% for feature in featureview.features %} | ||
| {% if full_feature_names %} | ||
| {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} | ||
| {% else %} | ||
| {% set col_name = featureview.field_mapping.get(feature, feature) %} | ||
| {% endif %} | ||
| , NULL AS "{{ col_name }}" | ||
| {% endfor %} | ||
| {% for featureview in featureviews %} | ||
| , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" | ||
| {% endfor %} | ||
| FROM spine | ||
| FROM spine s | ||
|
|
||
| {% for featureview in featureviews %} | ||
| UNION ALL | ||
| {# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #} | ||
|
|
||
| SELECT | ||
| event_timestamp, | ||
| d.event_timestamp, | ||
| {% for entity in all_entities %} | ||
| {% if entity in featureview.entities %} | ||
| "{{ entity }}", | ||
| d."{{ entity }}", | ||
| {% else %} | ||
| NULL AS "{{ entity }}", | ||
| {% endif %} | ||
| {% endfor %} | ||
| 0 AS is_spine, | ||
| 1 AS feature_anchor | ||
| {# Emit this FV's features; NULL for other FVs' features #} | ||
| {% for fv_inner in featureviews %} | ||
| {% for feature in fv_inner.features %} | ||
| {% if full_feature_names %} | ||
| {% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %} | ||
| {% else %} | ||
| {% set col_name = fv_inner.field_mapping.get(feature, feature) %} | ||
| {% endif %} | ||
| {% if fv_inner.name == featureview.name %} | ||
| , "{{ col_name }}" | ||
| {% else %} | ||
| , NULL AS "{{ col_name }}" | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% endfor %} | ||
| {# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #} | ||
| {% for fv_inner in featureviews %} | ||
| {% if fv_inner.name == featureview.name %} | ||
| , event_timestamp AS "{{ fv_inner.name }}__feature_ts" | ||
| {% for feature in featureview.features %} | ||
| {% if full_feature_names %} | ||
| {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} | ||
| {% else %} | ||
| , NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts" | ||
| {% set col_name = featureview.field_mapping.get(feature, feature) %} | ||
| {% endif %} | ||
| , d."{{ col_name }}" | ||
| {% endfor %} | ||
| FROM "{{ featureview.name }}__data" | ||
| {% endfor %} | ||
| , d.event_timestamp AS "{{ featureview.name }}__feature_ts" | ||
| FROM "{{ featureview.name }}__data" d | ||
| ), | ||
|
|
||
| {# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #} | ||
| stacked_with_group AS ( | ||
| "{{ featureview.name }}__grouped" AS ( | ||
| SELECT *, | ||
| COUNT(feature_anchor) OVER ( | ||
| PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} | ||
| ORDER BY event_timestamp ASC, is_spine ASC | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_group_id | ||
| FROM stacked | ||
| FROM "{{ featureview.name }}__stacked" | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||
| ), | ||
|
|
||
| {# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #} | ||
| filled AS ( | ||
| "{{ featureview.name }}__filled" AS ( | ||
| SELECT | ||
| event_timestamp, | ||
| {% for entity in all_entities %} | ||
| "{{ entity }}", | ||
| {% endfor %} | ||
| is_spine | ||
| {% for feat in all_feature_cols %} | ||
| , FIRST_VALUE("{{ feat }}") OVER ( | ||
| {% for feature in featureview.features %} | ||
| {% if full_feature_names %} | ||
| {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} | ||
| {% else %} | ||
| {% set col_name = featureview.field_mapping.get(feature, feature) %} | ||
| {% endif %} | ||
| , FIRST_VALUE("{{ col_name }}") OVER ( | ||
| PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id | ||
| ORDER BY event_timestamp ASC, is_spine ASC | ||
| ) AS "{{ feat }}" | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS "{{ col_name }}" | ||
| {% endfor %} | ||
| {% for featureview in featureviews %} | ||
| , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( | ||
| PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id | ||
| ORDER BY event_timestamp ASC, is_spine ASC | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS "{{ featureview.name }}__filled_ts" | ||
| {% endfor %} | ||
| FROM stacked_with_group | ||
| ) | ||
| FROM "{{ featureview.name }}__grouped" | ||
| ){% if not loop.last %},{% endif %} | ||
| {% endfor %} | ||
|
|
||
| {# --- CTE 4: Filter spine + TTL validation --- #} | ||
| {# --- Final: join per-FV filled results back onto spine --- #} | ||
| SELECT | ||
| event_timestamp, | ||
| spine.event_timestamp, | ||
| {% for entity in all_entities %} | ||
| "{{ entity }}", | ||
| spine."{{ entity }}", | ||
| {% endfor %} | ||
| {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} | ||
| {% set feat_idx = namespace(count=0) %} | ||
|
|
@@ -748,16 +742,23 @@ def _get_entity_schema( | |
| {% set col_name = featureview.field_mapping.get(feature, feature) %} | ||
| {% endif %} | ||
| {% if featureview.ttl != 0 %} | ||
| CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second | ||
| THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} | ||
| CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The TTL enforcement via CASE is correct — when One edge case: if |
||
| THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} | ||
| {% else %} | ||
| "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} | ||
| "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% endfor %} | ||
| FROM filled | ||
| WHERE is_spine = 1 | ||
| ORDER BY event_timestamp | ||
| FROM spine | ||
| {% for featureview in featureviews %} | ||
| LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f" | ||
| ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp | ||
| {% for entity in all_entities %} | ||
| AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good use of |
||
| {% endfor %} | ||
| AND "{{ featureview.name }}__f".is_spine = 1 | ||
| {% endfor %} | ||
| ORDER BY spine.event_timestamp | ||
| {% endif %} | ||
| {% else %} | ||
| WITH | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.