From d1fe30e421756dc443b7b68ecc07a52e03ab3ef9 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 12:15:14 +0530 Subject: [PATCH 1/9] Optimizing Date-Range Queries Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 226 ++++++++++++++---- .../postgres_offline_store/test_postgres.py | 159 +++++++++--- 2 files changed, 308 insertions(+), 77 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 7c65b649046..aed375f9120 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -159,6 +159,21 @@ def get_historical_features( else: start_date = make_tzaware(start_date) + # Compute lookback_start_date for LOCF: pull feature data + # from (start_date - max_ttl) so window functions can + # forward-fill the last observation before start_date. + max_ttl_seconds = 0 + for fv in feature_views: + if fv.ttl and isinstance(fv.ttl, timedelta): + max_ttl_seconds = max( + max_ttl_seconds, int(fv.ttl.total_seconds()) + ) + lookback_start_date: Optional[datetime] = ( + start_date - timedelta(seconds=max_ttl_seconds) + if max_ttl_seconds > 0 + else start_date + ) + entity_df = pd.DataFrame( { "event_timestamp": pd.date_range( @@ -166,6 +181,8 @@ def get_historical_features( )[:1] # Just one row } ) + else: + lookback_start_date = None entity_schema = _get_entity_schema(entity_df, config) @@ -230,6 +247,7 @@ def query_generator() -> Iterator[str]: use_cte=use_cte, start_date=start_date, end_date=end_date, + lookback_start_date=lookback_start_date, ) finally: # Only cleanup if we created a table @@ -444,6 +462,7 @@ def build_point_in_time_query( use_cte: bool = False, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + lookback_start_date: Optional[datetime] = None, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for PostgreSQL""" template = Environment(loader=BaseLoader()).from_string(source=query_template) @@ -474,6 +493,7 @@ def build_point_in_time_query( "use_cte": use_cte, "start_date": start_date, "end_date": end_date, + "lookback_start_date": lookback_start_date, } query = template.render(template_context) @@ -536,30 +556,68 @@ def _get_entity_schema( AND "{{ featureviews[0].timestamp_field }}" >= '{{ featureviews[0].min_event_timestamp }}' {% endif %} {% else %} +{# --- LOCF (Last Observation Carried Forward) path for multi-FV date-range --- #} + +{# Collect deduplicated entity list across all FVs #} +{% set all_entities = [] %} +{% for featureview in featureviews %} + {% for entity in featureview.entities %} + {% if entity not in all_entities %} + {% set _ = all_entities.append(entity) %} + {% endif %} + {% endfor %} +{% endfor %} + +{# Build list of output feature names per FV for consistent column ordering #} +{% set all_feature_cols = [] %} +{% for featureview in featureviews %} + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set _ = all_feature_cols.append(featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature)) %} + {% else %} + {% set _ = all_feature_cols.append(featureview.field_mapping.get(feature, feature)) %} + {% endif %} + {% endfor %} +{% endfor %} + WITH +{# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #} {% for featureview in featureviews %} -"{{ featureview.name }}__data" AS ( +"{{ featureview.name }}__data_raw" AS ( SELECT "{{ featureview.timestamp_field }}" AS event_timestamp, - {% if featureview.created_timestamp_column %} - "{{ featureview.created_timestamp_column }}" AS created_timestamp, - {% endif %} {% for entity in featureview.entities %} "{{ entity }}", {% endfor %} {% for feature in featureview.features %} - "{{ feature }}" AS {% if full_feature_names %}"{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if not loop.last %},{% endif %} + "{{ feature }}" AS "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} {% endfor %} + {% if featureview.created_timestamp_column %} + , "{{ featureview.created_timestamp_column }}" AS created_timestamp + {% endif %} FROM {{ featureview.table_subquery }} AS sub - WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ start_date }}' AND '{{ end_date }}' - {% if featureview.ttl != 0 and featureview.min_event_timestamp %} - AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' - {% endif %} + WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ lookback_start_date or start_date }}' AND '{{ end_date }}' +), +{% if featureview.created_timestamp_column %} +"{{ featureview.name }}__data" AS ( + SELECT * FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}", {% endfor %}event_timestamp + ORDER BY created_timestamp DESC + ) AS __rn + FROM "{{ featureview.name }}__data_raw" + ) __dedup WHERE __rn = 1 ), +{% else %} +"{{ featureview.name }}__data" AS ( + SELECT * FROM "{{ featureview.name }}__data_raw" +), +{% endif %} {% endfor %} --- Create a base query with all unique entity + timestamp combinations -base_entities AS ( +{# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #} +spine AS ( {% for featureview in featureviews %} SELECT DISTINCT event_timestamp, @@ -567,58 +625,132 @@ def _get_entity_schema( "{{ entity }}"{% if not loop.last %},{% endif %} {% endfor %} FROM "{{ featureview.name }}__data" + WHERE event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION {% endif %} {% endfor %} -) +), + +{# --- CTE 1: Stack spine + feature rows via UNION ALL --- #} +stacked AS ( + {# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #} + SELECT + event_timestamp, + {% for entity in all_entities %} + "{{ entity }}", + {% endfor %} + 1 AS is_spine, + NULL::int AS feature_anchor + {% for feat in all_feature_cols %} + , NULL AS "{{ feat }}" + {% endfor %} + {% for featureview in featureviews %} + , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" + {% endfor %} + FROM spine -SELECT - base.event_timestamp, - {% set all_entities = [] %} {% for featureview in featureviews %} - {% for entity in featureview.entities %} - {% if entity not in all_entities %} - {% set _ = all_entities.append(entity) %} + UNION ALL + {# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #} + SELECT + event_timestamp, + {% for entity in all_entities %} + {% if entity in featureview.entities %} + "{{ entity }}", + {% else %} + NULL AS "{{ entity }}", + {% endif %} + {% endfor %} + 0 AS is_spine, + 1 AS feature_anchor + {# Emit this FV's features; NULL for other FVs' features #} + {% for fv_inner in featureviews %} + {% for feature in fv_inner.features %} + {% if full_feature_names %} + {% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = fv_inner.field_mapping.get(feature, feature) %} + {% endif %} + {% if fv_inner.name == featureview.name %} + , "{{ col_name }}" + {% else %} + , NULL AS "{{ col_name }}" + {% endif %} + {% endfor %} + {% endfor %} + {# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #} + {% for fv_inner in featureviews %} + {% if fv_inner.name == featureview.name %} + , event_timestamp AS "{{ fv_inner.name }}__feature_ts" + {% else %} + , NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts" {% endif %} {% endfor %} + FROM "{{ featureview.name }}__data" {% endfor %} +), + +{# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #} +stacked_with_group AS ( + SELECT *, + COUNT(feature_anchor) OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + ORDER BY event_timestamp ASC, is_spine ASC + ) AS value_group_id + FROM stacked +), + +{# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #} +filled AS ( + SELECT + event_timestamp, + {% for entity in all_entities %} + "{{ entity }}", + {% endfor %} + is_spine + {% for feat in all_feature_cols %} + , FIRST_VALUE("{{ feat }}") OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ) AS "{{ feat }}" + {% endfor %} + {% for featureview in featureviews %} + , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ) AS "{{ featureview.name }}__filled_ts" + {% endfor %} + FROM stacked_with_group +) + +{# --- CTE 4: Filter spine + TTL validation --- #} +SELECT + event_timestamp, {% for entity in all_entities %} - base."{{ entity }}", + "{{ entity }}", {% endfor %} {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} - {% set feature_counter = namespace(count=0) %} + {% set feat_idx = namespace(count=0) %} {% for featureview in featureviews %} - {% set outer_loop_index = loop.index0 %} - {% for feature in featureview.features %} - {% set feature_counter.count = feature_counter.count + 1 %} - fv_{{ outer_loop_index }}."{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if feature_counter.count < total_features %},{% endif %} - {% endfor %} - {% endfor %} -FROM base_entities base -{% for featureview in featureviews %} -{% set outer_loop_index = loop.index0 %} -LEFT JOIN LATERAL ( - SELECT DISTINCT ON ({% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}) - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}", - {% endfor %} {% for feature in featureview.features %} - "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} + {% set feat_idx.count = feat_idx.count + 1 %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + {% if featureview.ttl != 0 %} + CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% else %} + "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% endif %} {% endfor %} - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - {% for entity in featureview.entities %} - AND fv_sub_{{ outer_loop_index }}."{{ entity }}" = base."{{ entity }}" {% endfor %} - ORDER BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, event_timestamp DESC -) AS fv_{{ outer_loop_index }} ON true -{% endfor %} -ORDER BY base.event_timestamp +FROM filled +WHERE is_spine = 1 +ORDER BY event_timestamp {% endif %} {% else %} WITH diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index fad837e4c16..9f00f9589cd 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -781,44 +781,143 @@ def test_sql_template_ttl_filtering(self): # Should not include TTL filtering when TTL is 0 or min_event_timestamp is None assert 'AND "event_timestamp" >=' not in query_no_ttl - def test_lateral_join_ttl_constraints(self): - """Test that LATERAL JOINs include proper TTL constraints""" - from jinja2 import BaseLoader, Environment + def test_locf_template_multi_fv_date_range(self): + """Test that multi-FV date-range uses LOCF (no LATERAL) with correct CTEs""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) - lateral_template = """ - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - """ + fv_contexts = [ + { + "name": "fv1", + "ttl": 86400, + "entities": ["entity1_id"], + "features": ["feat_a"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv1_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-01T00:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 3600, + "entities": ["entity1_id"], + "features": ["feat_b", "feat_c"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv2_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-06T23:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - template = Environment(loader=BaseLoader()).from_string(source=lateral_template) + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + lookback = start - timedelta(seconds=86400) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=lookback, + ) - # Test with TTL - context = { - "featureview": { - "name": "user_features", - "ttl": 86400, # 1 day - }, - "outer_loop_index": 0, - } + # No LATERAL joins + assert "LATERAL" not in query + + # LOCF CTEs present + assert "stacked" in query.lower() + assert "stacked_with_group" in query.lower() + assert "filled" in query.lower() + assert "spine" in query.lower() - query = template.render(context) + # Correct ORDER BY for deterministic grouping + assert "is_spine ASC" in query + + # FIRST_VALUE used for forward-fill + assert "FIRST_VALUE" in query + + # TTL CASE for fv1 (ttl=86400) assert "86400 * interval" in query - assert "base.event_timestamp -" in query + # TTL CASE for fv2 (ttl=3600) + assert "3600 * interval" in query - # Test without TTL - context_no_ttl = { - "featureview": { - "name": "user_features", - "ttl": 0, # No TTL + # lookback_start_date used in feature __data range + assert str(lookback.date()) in query + + # is_spine filter + assert "is_spine = 1" in query + + def test_locf_template_no_ttl(self): + """Test LOCF template with TTL=0 does not emit CASE for TTL""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "fv1", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv1_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, }, - "outer_loop_index": 0, - } + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - query_no_ttl = template.render(context_no_ttl) - assert "interval" not in query_no_ttl + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # No LATERAL, no TTL CASE + assert "LATERAL" not in query + assert "CASE WHEN" not in query + assert "FIRST_VALUE" in query def test_api_non_entity_functionality(self): """Test that FeatureStore API accepts non-entity parameters correctly""" From 4bde7dd8a20d67b50150a5416599577fc4813345 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 16:38:53 +0530 Subject: [PATCH 2/9] fix issues Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 115 +++++++++--------- .../postgres_offline_store/test_postgres.py | 22 +++- 2 files changed, 76 insertions(+), 61 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index aed375f9120..770f488aefe 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -632,103 +632,97 @@ def _get_entity_schema( {% endfor %} ), -{# --- CTE 1: Stack spine + feature rows via UNION ALL --- #} -stacked AS ( - {# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #} +{# --- Per-FV independent LOCF forward-fill pipelines --- #} +{# Each FV is stacked, grouped, and filled independently to prevent #} +{# cross-FV interference when multiple FVs share overlapping timestamps. #} +{% for featureview in featureviews %} +"{{ featureview.name }}__stacked" AS ( SELECT - event_timestamp, + s.event_timestamp, {% for entity in all_entities %} - "{{ entity }}", + s."{{ entity }}", {% endfor %} 1 AS is_spine, NULL::int AS feature_anchor - {% for feat in all_feature_cols %} - , NULL AS "{{ feat }}" + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , NULL AS "{{ col_name }}" {% endfor %} - {% for featureview in featureviews %} , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" - {% endfor %} - FROM spine + FROM spine s - {% for featureview in featureviews %} UNION ALL - {# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #} + SELECT - event_timestamp, + d.event_timestamp, {% for entity in all_entities %} {% if entity in featureview.entities %} - "{{ entity }}", + d."{{ entity }}", {% else %} NULL AS "{{ entity }}", {% endif %} {% endfor %} 0 AS is_spine, 1 AS feature_anchor - {# Emit this FV's features; NULL for other FVs' features #} - {% for fv_inner in featureviews %} - {% for feature in fv_inner.features %} - {% if full_feature_names %} - {% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %} - {% else %} - {% set col_name = fv_inner.field_mapping.get(feature, feature) %} - {% endif %} - {% if fv_inner.name == featureview.name %} - , "{{ col_name }}" - {% else %} - , NULL AS "{{ col_name }}" - {% endif %} - {% endfor %} - {% endfor %} - {# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #} - {% for fv_inner in featureviews %} - {% if fv_inner.name == featureview.name %} - , event_timestamp AS "{{ fv_inner.name }}__feature_ts" + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} {% else %} - , NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts" + {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} + , d."{{ col_name }}" {% endfor %} - FROM "{{ featureview.name }}__data" - {% endfor %} + , d.event_timestamp AS "{{ featureview.name }}__feature_ts" + FROM "{{ featureview.name }}__data" d ), -{# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #} -stacked_with_group AS ( +"{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id - FROM stacked + FROM "{{ featureview.name }}__stacked" ), -{# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #} -filled AS ( +"{{ featureview.name }}__filled" AS ( SELECT event_timestamp, {% for entity in all_entities %} "{{ entity }}", {% endfor %} is_spine - {% for feat in all_feature_cols %} - , FIRST_VALUE("{{ feat }}") OVER ( + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , FIRST_VALUE("{{ col_name }}") OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC - ) AS "{{ feat }}" + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS "{{ col_name }}" {% endfor %} - {% for featureview in featureviews %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" - {% endfor %} - FROM stacked_with_group -) + FROM "{{ featureview.name }}__grouped" +){% if not loop.last %},{% endif %} +{% endfor %} -{# --- CTE 4: Filter spine + TTL validation --- #} +{# --- Final: join per-FV filled results back onto spine --- #} SELECT - event_timestamp, + spine.event_timestamp, {% for entity in all_entities %} - "{{ entity }}", + spine."{{ entity }}", {% endfor %} {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} {% set feat_idx = namespace(count=0) %} @@ -741,16 +735,23 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} {% if featureview.ttl != 0 %} - CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second - THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% else %} - "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% endif %} {% endfor %} {% endfor %} -FROM filled -WHERE is_spine = 1 -ORDER BY event_timestamp +FROM spine +{% for featureview in featureviews %} +LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f" + ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp + {% for entity in all_entities %} + AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}" + {% endfor %} + AND "{{ featureview.name }}__f".is_spine = 1 +{% endfor %} +ORDER BY spine.event_timestamp {% endif %} {% else %} WITH diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index 9f00f9589cd..ac17cfdf8d5 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -838,18 +838,32 @@ def test_locf_template_multi_fv_date_range(self): # No LATERAL joins assert "LATERAL" not in query - # LOCF CTEs present - assert "stacked" in query.lower() - assert "stacked_with_group" in query.lower() - assert "filled" in query.lower() + # Per-FV LOCF CTEs present (independent forward-fill per feature view) + assert "fv1__stacked" in query + assert "fv1__grouped" in query + assert "fv1__filled" in query + assert "fv2__stacked" in query + assert "fv2__grouped" in query + assert "fv2__filled" in query assert "spine" in query.lower() + # No global stacked/stacked_with_group/filled (replaced by per-FV CTEs) + assert '"stacked_with_group"' not in query + # Correct ORDER BY for deterministic grouping assert "is_spine ASC" in query + # Explicit ROWS framing for deterministic window functions + assert "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" in query + # FIRST_VALUE used for forward-fill assert "FIRST_VALUE" in query + # Per-FV filled results joined back to spine + assert "LEFT JOIN" in query + assert '"fv1__f"' in query or "fv1__filled" in query + assert '"fv2__f"' in query or "fv2__filled" in query + # TTL CASE for fv1 (ttl=86400) assert "86400 * interval" in query # TTL CASE for fv2 (ttl=3600) From 02e846bc477eb9d5d733817a35cf4054e397d786 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 16:40:05 +0530 Subject: [PATCH 3/9] fix lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 770f488aefe..cfe98953b63 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -165,9 +165,7 @@ def get_historical_features( max_ttl_seconds = 0 for fv in feature_views: if fv.ttl and isinstance(fv.ttl, timedelta): - max_ttl_seconds = max( - max_ttl_seconds, int(fv.ttl.total_seconds()) - ) + max_ttl_seconds = max(max_ttl_seconds, int(fv.ttl.total_seconds())) lookback_start_date: Optional[datetime] = ( start_date - timedelta(seconds=max_ttl_seconds) if max_ttl_seconds > 0 From 0794b112ae5ba75e14fe04e16f167489c16e2f07 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Tue, 10 Mar 2026 02:39:14 +0530 Subject: [PATCH 4/9] featureview entities Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../contrib/postgres_offline_store/postgres.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index cfe98953b63..b977d56b38a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -681,7 +681,7 @@ def _get_entity_schema( "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -702,13 +702,13 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} , FIRST_VALUE("{{ col_name }}") OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ col_name }}" {% endfor %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" From 4383dba2fbf7a9a4de8941459ccc87ceb34bf01b Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Tue, 10 Mar 2026 03:01:31 +0530 Subject: [PATCH 5/9] fix LOCF spine UNION and empty-entity SQL Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 45 +++++++------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index b977d56b38a..77103ddda76 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -615,15 +615,14 @@ def _get_entity_schema( {% endfor %} {# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #} +{# Each UNION branch selects all_entities so column count/order match; NULL for entities not in this FV. #} spine AS ( {% for featureview in featureviews %} SELECT DISTINCT - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}"{% if not loop.last %},{% endif %} - {% endfor %} - FROM "{{ featureview.name }}__data" - WHERE event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %} + FROM "{{ featureview.name }}__data" d + WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION {% endif %} @@ -636,10 +635,8 @@ def _get_entity_schema( {% for featureview in featureviews %} "{{ featureview.name }}__stacked" AS ( SELECT - s.event_timestamp, - {% for entity in all_entities %} - s."{{ entity }}", - {% endfor %} + s.event_timestamp{% for entity in all_entities %}, + s."{{ entity }}"{% endfor %}, 1 AS is_spine, NULL::int AS feature_anchor {% for feature in featureview.features %} @@ -656,14 +653,8 @@ def _get_entity_schema( UNION ALL SELECT - d.event_timestamp, - {% for entity in all_entities %} - {% if entity in featureview.entities %} - d."{{ entity }}", - {% else %} - NULL AS "{{ entity }}", - {% endif %} - {% endfor %} + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %}, 0 AS is_spine, 1 AS feature_anchor {% for feature in featureview.features %} @@ -681,7 +672,7 @@ def _get_entity_schema( "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}1{% endif %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -690,10 +681,8 @@ def _get_entity_schema( "{{ featureview.name }}__filled" AS ( SELECT - event_timestamp, - {% for entity in all_entities %} - "{{ entity }}", - {% endfor %} + event_timestamp{% for entity in all_entities %}, + "{{ entity }}"{% endfor %}, is_spine {% for feature in featureview.features %} {% if full_feature_names %} @@ -702,13 +691,13 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} , FIRST_VALUE("{{ col_name }}") OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ col_name }}" {% endfor %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" @@ -718,10 +707,8 @@ def _get_entity_schema( {# --- Final: join per-FV filled results back onto spine --- #} SELECT - spine.event_timestamp, - {% for entity in all_entities %} - spine."{{ entity }}", - {% endfor %} + spine.event_timestamp{% for entity in all_entities %}, + spine."{{ entity }}"{% endfor %}, {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} {% set feat_idx = namespace(count=0) %} {% for featureview in featureviews %} From 4980d80ee8eaa7564a3a4613263618cba0f180a8 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 16:50:25 +0530 Subject: [PATCH 6/9] added fixes Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 72 ++++++++----------- 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 77103ddda76..38965eaa15e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -139,17 +139,18 @@ def get_historical_features( end_date = _utc_now() else: end_date = make_tzaware(end_date) + # Find the maximum TTL across all feature views to ensure we capture enough data + max_ttl_seconds = max( + ( + int(fv.ttl.total_seconds()) + for fv in feature_views + if fv.ttl and isinstance(fv.ttl, timedelta) + ), + default=0, + ) # Calculate start_date from TTL if not provided - if start_date is None: - # Find the maximum TTL across all feature views to ensure we capture enough data - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - ttl_seconds = int(fv.ttl.total_seconds()) - max_ttl_seconds = max(max_ttl_seconds, ttl_seconds) - if max_ttl_seconds > 0: # Start from (end_date - max_ttl) to ensure we capture all relevant features start_date = end_date - timedelta(seconds=max_ttl_seconds) @@ -162,25 +163,18 @@ def get_historical_features( # Compute lookback_start_date for LOCF: pull feature data # from (start_date - max_ttl) so window functions can # forward-fill the last observation before start_date. - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - max_ttl_seconds = max(max_ttl_seconds, int(fv.ttl.total_seconds())) lookback_start_date: Optional[datetime] = ( start_date - timedelta(seconds=max_ttl_seconds) if max_ttl_seconds > 0 else start_date ) - entity_df = pd.DataFrame( - { - "event_timestamp": pd.date_range( - start=start_date, end=end_date, freq="1s", tz=timezone.utc - )[:1] # Just one row - } - ) + # Single row with end_date + entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + skip_entity_upload = True else: lookback_start_date = None + skip_entity_upload = False entity_schema = _get_entity_schema(entity_df, config) @@ -204,7 +198,12 @@ def query_generator() -> Iterator[str]: and config.offline_store.entity_select_mode == EntitySelectMode.embed_query ) - if use_cte: + if skip_entity_upload: + # LOCF path never uses left_table + left_table_query_string = ( + "(SELECT NULL::timestamptz AS event_timestamp LIMIT 0)" + ) + elif use_cte: left_table_query_string = entity_df else: left_table_query_string = table_name @@ -248,9 +247,10 @@ def query_generator() -> Iterator[str]: lookback_start_date=lookback_start_date, ) finally: - # Only cleanup if we created a table + # Only cleanup if we created a table (not when skip_entity_upload) if ( - config.offline_store.entity_select_mode + not skip_entity_upload + and config.offline_store.entity_select_mode == EntitySelectMode.temp_table ): with _get_conn(config.offline_store) as conn, conn.cursor() as cur: @@ -566,18 +566,6 @@ def _get_entity_schema( {% endfor %} {% endfor %} -{# Build list of output feature names per FV for consistent column ordering #} -{% set all_feature_cols = [] %} -{% for featureview in featureviews %} - {% for feature in featureview.features %} - {% if full_feature_names %} - {% set _ = all_feature_cols.append(featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature)) %} - {% else %} - {% set _ = all_feature_cols.append(featureview.field_mapping.get(feature, feature)) %} - {% endif %} - {% endfor %} -{% endfor %} - WITH {# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #} {% for featureview in featureviews %} @@ -607,10 +595,6 @@ def _get_entity_schema( FROM "{{ featureview.name }}__data_raw" ) __dedup WHERE __rn = 1 ), -{% else %} -"{{ featureview.name }}__data" AS ( - SELECT * FROM "{{ featureview.name }}__data_raw" -), {% endif %} {% endfor %} @@ -621,7 +605,7 @@ def _get_entity_schema( SELECT DISTINCT d.event_timestamp{% for entity in all_entities %}, {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %} - FROM "{{ featureview.name }}__data" d + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION @@ -666,13 +650,13 @@ def _get_entity_schema( , d."{{ col_name }}" {% endfor %} , d.event_timestamp AS "{{ featureview.name }}__feature_ts" - FROM "{{ featureview.name }}__data" d + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d ), "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}1{% endif %} + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}(SELECT NULL){% endif %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -720,7 +704,7 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} {% if featureview.ttl != 0 %} - CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= make_interval(secs => {{ featureview.ttl }}) THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% else %} "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} @@ -816,7 +800,7 @@ def _get_entity_schema( FROM {{ featureview.table_subquery }} AS sub WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second + AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - make_interval(secs => {{ featureview.ttl }}) {% endif %} ), @@ -831,7 +815,7 @@ def _get_entity_schema( AND subquery.event_timestamp <= entity_dataframe.entity_timestamp {% if featureview.ttl == 0 %}{% else %} - AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - {{ featureview.ttl }} * interval '1' second + AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - make_interval(secs => {{ featureview.ttl }}) {% endif %} {% for entity in featureview.entities %} From cf3c756a753da7d04b9032f20ed2419fe83cfc64 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 16:52:01 +0530 Subject: [PATCH 7/9] updates test Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/test_postgres.py | 224 +++++++++++++++++- 1 file changed, 220 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index ac17cfdf8d5..c2e8e3cd8f4 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -864,10 +864,9 @@ def test_locf_template_multi_fv_date_range(self): assert '"fv1__f"' in query or "fv1__filled" in query assert '"fv2__f"' in query or "fv2__filled" in query - # TTL CASE for fv1 (ttl=86400) - assert "86400 * interval" in query - # TTL CASE for fv2 (ttl=3600) - assert "3600 * interval" in query + # TTL CASE for fv1 (ttl=86400) and fv2 (ttl=3600) use make_interval + assert "make_interval(secs => 86400)" in query + assert "make_interval(secs => 3600)" in query # lookback_start_date used in feature __data range assert str(lookback.date()) in query @@ -933,6 +932,223 @@ def test_locf_template_no_ttl(self): assert "CASE WHEN" not in query assert "FIRST_VALUE" in query + def test_locf_template_different_entity_sets(self): + """Different entity sets across FVs: NULL AS entity and IS NOT DISTINCT FROM.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # fv1: driver_id, fv2: customer_id -> spine has both, each FV fills its own + fv_contexts = [ + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "customer_fv", + "ttl": 0, + "entities": ["customer_id"], + "features": ["amount"], + "field_mapping": {"amount": "amount"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"customer_table"', + "entity_selections": ['"customer_id" AS "customer_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # all_entities = driver_id, customer_id; each branch uses NULL for the other + assert 'NULL AS "driver_id"' in query or '"driver_id"' in query + assert 'NULL AS "customer_id"' in query or '"customer_id"' in query + assert "IS NOT DISTINCT FROM" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_entityless_feature_view(self): + """Entityless FV: PARTITION BY 1 fallback produces valid SQL (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "global_fv", + "ttl": 0, + "entities": [], + "features": ["global_feat"], + "field_mapping": {"global_feat": "global_feat"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"global_table"', + "entity_selections": [], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # Entityless FV uses PARTITION BY (SELECT NULL) for single partition (more standard than PARTITION BY 1) + assert "PARTITION BY (SELECT NULL)" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_with_created_timestamp_column(self): + """created_timestamp_column set: dedup CTE __data from __data_raw via ROW_NUMBER (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "fv_dedup", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": "created_ts", + "table_subquery": '"fv_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {"f2": "f2"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "fv_dedup__data_raw" in query + assert "fv_dedup__data" in query + assert "ROW_NUMBER()" in query + assert "__dedup" in query + assert "created_ts" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_full_feature_names(self): + """full_feature_names=True: featureview.name__feature column naming.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "my_fv", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=True, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "my_fv__f1" in query or '"my_fv__f1"' in query + sqlglot.parse(query, dialect="postgres") + def test_api_non_entity_functionality(self): """Test that FeatureStore API accepts non-entity parameters correctly""" from feast import FeatureStore From ae29d2a8d2d071258b0fadcd922edc8abeff456e Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 17:08:26 +0530 Subject: [PATCH 8/9] fix entity_df range Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../contrib/postgres_offline_store/postgres.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 38965eaa15e..a8068708ae0 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -182,11 +182,18 @@ def get_historical_features( offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) - entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( - entity_df, - entity_df_event_timestamp_col, - config, - ) + # In non-entity mode, use the actual requested range so that + # min_event_timestamp (= range[0] - TTL) doesn't clip the window. + # The synthetic entity_df only has end_date, which would wrongly + # set min_event_timestamp to end_date - TTL instead of start_date - TTL. + if skip_entity_upload and start_date is not None and end_date is not None: + entity_df_event_timestamp_range = (start_date, end_date) + else: + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, + entity_df_event_timestamp_col, + config, + ) @contextlib.contextmanager def query_generator() -> Iterator[str]: From 84c13141c41423524d4ecebcff88a6db0ffbc0e1 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 18:32:40 +0530 Subject: [PATCH 9/9] updated postgres.py Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index a8068708ae0..b761c656c8a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -182,10 +182,6 @@ def get_historical_features( offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) - # In non-entity mode, use the actual requested range so that - # min_event_timestamp (= range[0] - TTL) doesn't clip the window. - # The synthetic entity_df only has end_date, which would wrongly - # set min_event_timestamp to end_date - TTL instead of start_date - TTL. if skip_entity_upload and start_date is not None and end_date is not None: entity_df_event_timestamp_range = (start_date, end_date) else: