From 309cdac14996e5ff1197fe4f1b84ecc33e24e751 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 12:15:14 +0530 Subject: [PATCH 01/12] Optimizing Date-Range Queries Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 262 ++++++++++++++---- .../postgres_offline_store/test_postgres.py | 159 +++++++++-- 2 files changed, 339 insertions(+), 82 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 50e48208647..39cdfd122d8 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -134,12 +134,55 @@ def get_historical_features( # Handle non-entity retrieval mode if entity_df is None: - start_date, end_date = compute_non_entity_date_range( - feature_views, - start_date=start_date, - end_date=end_date, + # Default to current time if end_date not provided + if end_date is None: + end_date = _utc_now() + else: + end_date = make_tzaware(end_date) + + # Calculate start_date from TTL if not provided + + if start_date is None: + # Find the maximum TTL across all feature views to ensure we capture enough data + max_ttl_seconds = 0 + for fv in feature_views: + if fv.ttl and isinstance(fv.ttl, timedelta): + ttl_seconds = int(fv.ttl.total_seconds()) + max_ttl_seconds = max(max_ttl_seconds, ttl_seconds) + + if max_ttl_seconds > 0: + # Start from (end_date - max_ttl) to ensure we capture all relevant features + start_date = end_date - timedelta(seconds=max_ttl_seconds) + else: + # If no TTL is set, default to 30 days before end_date + start_date = end_date - timedelta(days=30) + else: + start_date = make_tzaware(start_date) + + # Compute lookback_start_date for LOCF: pull feature data + # from (start_date - max_ttl) so window functions can + # forward-fill the last observation before start_date. + max_ttl_seconds = 0 + for fv in feature_views: + if fv.ttl and isinstance(fv.ttl, timedelta): + max_ttl_seconds = max( + max_ttl_seconds, int(fv.ttl.total_seconds()) + ) + lookback_start_date: Optional[datetime] = ( + start_date - timedelta(seconds=max_ttl_seconds) + if max_ttl_seconds > 0 + else start_date ) - entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + + entity_df = pd.DataFrame( + { + "event_timestamp": pd.date_range( + start=start_date, end=end_date, freq="1s", tz=timezone.utc + )[:1] # Just one row + } + ) + else: + lookback_start_date = None entity_schema = _get_entity_schema(entity_df, config) @@ -211,6 +254,7 @@ def query_generator() -> Iterator[str]: use_cte=use_cte, start_date=start_date, end_date=end_date, + lookback_start_date=lookback_start_date, ) finally: # Only cleanup if we created a table @@ -425,6 +469,7 @@ def build_point_in_time_query( use_cte: bool = False, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + lookback_start_date: Optional[datetime] = None, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for PostgreSQL""" template = Environment(loader=BaseLoader()).from_string(source=query_template) @@ -455,6 +500,7 @@ def build_point_in_time_query( "use_cte": use_cte, "start_date": start_date, "end_date": end_date, + "lookback_start_date": lookback_start_date, } query = template.render(template_context) @@ -517,30 +563,68 @@ def _get_entity_schema( AND "{{ featureviews[0].timestamp_field }}" >= '{{ featureviews[0].min_event_timestamp }}' {% endif %} {% else %} +{# --- LOCF (Last Observation Carried Forward) path for multi-FV date-range --- #} + +{# Collect deduplicated entity list across all FVs #} +{% set all_entities = [] %} +{% for featureview in featureviews %} + {% for entity in featureview.entities %} + {% if entity not in all_entities %} + {% set _ = all_entities.append(entity) %} + {% endif %} + {% endfor %} +{% endfor %} + +{# Build list of output feature names per FV for consistent column ordering #} +{% set all_feature_cols = [] %} +{% for featureview in featureviews %} + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set _ = all_feature_cols.append(featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature)) %} + {% else %} + {% set _ = all_feature_cols.append(featureview.field_mapping.get(feature, feature)) %} + {% endif %} + {% endfor %} +{% endfor %} + WITH +{# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #} {% for featureview in featureviews %} -"{{ featureview.name }}__data" AS ( +"{{ featureview.name }}__data_raw" AS ( SELECT "{{ featureview.timestamp_field }}" AS event_timestamp, - {% if featureview.created_timestamp_column %} - "{{ featureview.created_timestamp_column }}" AS created_timestamp, - {% endif %} {% for entity in featureview.entities %} "{{ entity }}", {% endfor %} {% for feature in featureview.features %} - "{{ feature }}" AS {% if full_feature_names %}"{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if not loop.last %},{% endif %} + "{{ feature }}" AS "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} {% endfor %} + {% if featureview.created_timestamp_column %} + , "{{ featureview.created_timestamp_column }}" AS created_timestamp + {% endif %} FROM {{ featureview.table_subquery }} AS sub - WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ start_date }}' AND '{{ end_date }}' - {% if featureview.ttl != 0 and featureview.min_event_timestamp %} - AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' - {% endif %} + WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ lookback_start_date or start_date }}' AND '{{ end_date }}' ), +{% if featureview.created_timestamp_column %} +"{{ featureview.name }}__data" AS ( + SELECT * FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}", {% endfor %}event_timestamp + ORDER BY created_timestamp DESC + ) AS __rn + FROM "{{ featureview.name }}__data_raw" + ) __dedup WHERE __rn = 1 +), +{% else %} +"{{ featureview.name }}__data" AS ( + SELECT * FROM "{{ featureview.name }}__data_raw" +), +{% endif %} {% endfor %} --- Create a base query with all unique entity + timestamp combinations -base_entities AS ( +{# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #} +spine AS ( {% for featureview in featureviews %} SELECT DISTINCT event_timestamp, @@ -548,58 +632,132 @@ def _get_entity_schema( "{{ entity }}"{% if not loop.last %},{% endif %} {% endfor %} FROM "{{ featureview.name }}__data" + WHERE event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION {% endif %} {% endfor %} -) +), + +{# --- CTE 1: Stack spine + feature rows via UNION ALL --- #} +stacked AS ( + {# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #} + SELECT + event_timestamp, + {% for entity in all_entities %} + "{{ entity }}", + {% endfor %} + 1 AS is_spine, + NULL::int AS feature_anchor + {% for feat in all_feature_cols %} + , NULL AS "{{ feat }}" + {% endfor %} + {% for featureview in featureviews %} + , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" + {% endfor %} + FROM spine -SELECT - base.event_timestamp, - {% set all_entities = [] %} {% for featureview in featureviews %} - {% for entity in featureview.entities %} - {% if entity not in all_entities %} - {% set _ = all_entities.append(entity) %} + UNION ALL + {# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #} + SELECT + event_timestamp, + {% for entity in all_entities %} + {% if entity in featureview.entities %} + "{{ entity }}", + {% else %} + NULL AS "{{ entity }}", + {% endif %} + {% endfor %} + 0 AS is_spine, + 1 AS feature_anchor + {# Emit this FV's features; NULL for other FVs' features #} + {% for fv_inner in featureviews %} + {% for feature in fv_inner.features %} + {% if full_feature_names %} + {% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = fv_inner.field_mapping.get(feature, feature) %} + {% endif %} + {% if fv_inner.name == featureview.name %} + , "{{ col_name }}" + {% else %} + , NULL AS "{{ col_name }}" + {% endif %} + {% endfor %} + {% endfor %} + {# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #} + {% for fv_inner in featureviews %} + {% if fv_inner.name == featureview.name %} + , event_timestamp AS "{{ fv_inner.name }}__feature_ts" + {% else %} + , NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts" {% endif %} {% endfor %} + FROM "{{ featureview.name }}__data" {% endfor %} +), + +{# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #} +stacked_with_group AS ( + SELECT *, + COUNT(feature_anchor) OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + ORDER BY event_timestamp ASC, is_spine ASC + ) AS value_group_id + FROM stacked +), + +{# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #} +filled AS ( + SELECT + event_timestamp, + {% for entity in all_entities %} + "{{ entity }}", + {% endfor %} + is_spine + {% for feat in all_feature_cols %} + , FIRST_VALUE("{{ feat }}") OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ) AS "{{ feat }}" + {% endfor %} + {% for featureview in featureviews %} + , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( + PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ) AS "{{ featureview.name }}__filled_ts" + {% endfor %} + FROM stacked_with_group +) + +{# --- CTE 4: Filter spine + TTL validation --- #} +SELECT + event_timestamp, {% for entity in all_entities %} - base."{{ entity }}", + "{{ entity }}", {% endfor %} {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} - {% set feature_counter = namespace(count=0) %} + {% set feat_idx = namespace(count=0) %} {% for featureview in featureviews %} - {% set outer_loop_index = loop.index0 %} {% for feature in featureview.features %} - {% set feature_counter.count = feature_counter.count + 1 %} - fv_{{ outer_loop_index }}."{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if feature_counter.count < total_features %},{% endif %} - {% endfor %} - {% endfor %} -FROM base_entities base -{% for featureview in featureviews %} -{% set outer_loop_index = loop.index0 %} -LEFT JOIN LATERAL ( - SELECT DISTINCT ON ({% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}) - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}", - {% endfor %} - {% for feature in featureview.features %} - "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} + {% set feat_idx.count = feat_idx.count + 1 %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + {% if featureview.ttl != 0 %} + CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% else %} + "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% endif %} {% endfor %} - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - {% for entity in featureview.entities %} - AND fv_sub_{{ outer_loop_index }}."{{ entity }}" = base."{{ entity }}" {% endfor %} - ORDER BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, event_timestamp DESC -) AS fv_{{ outer_loop_index }} ON true -{% endfor %} -ORDER BY base.event_timestamp +FROM filled +WHERE is_spine = 1 +ORDER BY event_timestamp {% endif %} {% else %} WITH diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index 0cb0d98eeae..7184822b896 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -846,44 +846,143 @@ def test_sql_template_ttl_filtering(self): # Should not include TTL filtering when TTL is 0 or min_event_timestamp is None assert 'AND "event_timestamp" >=' not in query_no_ttl - def test_lateral_join_ttl_constraints(self): - """Test that LATERAL JOINs include proper TTL constraints""" - from jinja2 import BaseLoader, Environment + def test_locf_template_multi_fv_date_range(self): + """Test that multi-FV date-range uses LOCF (no LATERAL) with correct CTEs""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) - lateral_template = """ - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - """ + fv_contexts = [ + { + "name": "fv1", + "ttl": 86400, + "entities": ["entity1_id"], + "features": ["feat_a"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv1_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-01T00:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 3600, + "entities": ["entity1_id"], + "features": ["feat_b", "feat_c"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv2_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-06T23:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - template = Environment(loader=BaseLoader()).from_string(source=lateral_template) + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + lookback = start - timedelta(seconds=86400) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=lookback, + ) - # Test with TTL - context = { - "featureview": { - "name": "user_features", - "ttl": 86400, # 1 day - }, - "outer_loop_index": 0, - } + # No LATERAL joins + assert "LATERAL" not in query + + # LOCF CTEs present + assert "stacked" in query.lower() + assert "stacked_with_group" in query.lower() + assert "filled" in query.lower() + assert "spine" in query.lower() - query = template.render(context) + # Correct ORDER BY for deterministic grouping + assert "is_spine ASC" in query + + # FIRST_VALUE used for forward-fill + assert "FIRST_VALUE" in query + + # TTL CASE for fv1 (ttl=86400) assert "86400 * interval" in query - assert "base.event_timestamp -" in query + # TTL CASE for fv2 (ttl=3600) + assert "3600 * interval" in query - # Test without TTL - context_no_ttl = { - "featureview": { - "name": "user_features", - "ttl": 0, # No TTL + # lookback_start_date used in feature __data range + assert str(lookback.date()) in query + + # is_spine filter + assert "is_spine = 1" in query + + def test_locf_template_no_ttl(self): + """Test LOCF template with TTL=0 does not emit CASE for TTL""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "fv1", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv1_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, }, - "outer_loop_index": 0, - } + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - query_no_ttl = template.render(context_no_ttl) - assert "interval" not in query_no_ttl + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # No LATERAL, no TTL CASE + assert "LATERAL" not in query + assert "CASE WHEN" not in query + assert "FIRST_VALUE" in query def test_api_non_entity_functionality(self): """Test that FeatureStore API accepts non-entity parameters correctly""" From 3da5af3ec2491b63ca10533b0c7e2298cb3aba89 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 16:38:53 +0530 Subject: [PATCH 02/12] fix issues Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 115 +++++++++--------- .../postgres_offline_store/test_postgres.py | 22 +++- 2 files changed, 76 insertions(+), 61 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 39cdfd122d8..713d4591f6a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -639,103 +639,97 @@ def _get_entity_schema( {% endfor %} ), -{# --- CTE 1: Stack spine + feature rows via UNION ALL --- #} -stacked AS ( - {# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #} +{# --- Per-FV independent LOCF forward-fill pipelines --- #} +{# Each FV is stacked, grouped, and filled independently to prevent #} +{# cross-FV interference when multiple FVs share overlapping timestamps. #} +{% for featureview in featureviews %} +"{{ featureview.name }}__stacked" AS ( SELECT - event_timestamp, + s.event_timestamp, {% for entity in all_entities %} - "{{ entity }}", + s."{{ entity }}", {% endfor %} 1 AS is_spine, NULL::int AS feature_anchor - {% for feat in all_feature_cols %} - , NULL AS "{{ feat }}" + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , NULL AS "{{ col_name }}" {% endfor %} - {% for featureview in featureviews %} , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" - {% endfor %} - FROM spine + FROM spine s - {% for featureview in featureviews %} UNION ALL - {# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #} + SELECT - event_timestamp, + d.event_timestamp, {% for entity in all_entities %} {% if entity in featureview.entities %} - "{{ entity }}", + d."{{ entity }}", {% else %} NULL AS "{{ entity }}", {% endif %} {% endfor %} 0 AS is_spine, 1 AS feature_anchor - {# Emit this FV's features; NULL for other FVs' features #} - {% for fv_inner in featureviews %} - {% for feature in fv_inner.features %} - {% if full_feature_names %} - {% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %} - {% else %} - {% set col_name = fv_inner.field_mapping.get(feature, feature) %} - {% endif %} - {% if fv_inner.name == featureview.name %} - , "{{ col_name }}" - {% else %} - , NULL AS "{{ col_name }}" - {% endif %} - {% endfor %} - {% endfor %} - {# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #} - {% for fv_inner in featureviews %} - {% if fv_inner.name == featureview.name %} - , event_timestamp AS "{{ fv_inner.name }}__feature_ts" + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} {% else %} - , NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts" + {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} + , d."{{ col_name }}" {% endfor %} - FROM "{{ featureview.name }}__data" - {% endfor %} + , d.event_timestamp AS "{{ featureview.name }}__feature_ts" + FROM "{{ featureview.name }}__data" d ), -{# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #} -stacked_with_group AS ( +"{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id - FROM stacked + FROM "{{ featureview.name }}__stacked" ), -{# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #} -filled AS ( +"{{ featureview.name }}__filled" AS ( SELECT event_timestamp, {% for entity in all_entities %} "{{ entity }}", {% endfor %} is_spine - {% for feat in all_feature_cols %} - , FIRST_VALUE("{{ feat }}") OVER ( + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , FIRST_VALUE("{{ col_name }}") OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC - ) AS "{{ feat }}" + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS "{{ col_name }}" {% endfor %} - {% for featureview in featureviews %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" - {% endfor %} - FROM stacked_with_group -) + FROM "{{ featureview.name }}__grouped" +){% if not loop.last %},{% endif %} +{% endfor %} -{# --- CTE 4: Filter spine + TTL validation --- #} +{# --- Final: join per-FV filled results back onto spine --- #} SELECT - event_timestamp, + spine.event_timestamp, {% for entity in all_entities %} - "{{ entity }}", + spine."{{ entity }}", {% endfor %} {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} {% set feat_idx = namespace(count=0) %} @@ -748,16 +742,23 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} {% if featureview.ttl != 0 %} - CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second - THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% else %} - "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% endif %} {% endfor %} {% endfor %} -FROM filled -WHERE is_spine = 1 -ORDER BY event_timestamp +FROM spine +{% for featureview in featureviews %} +LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f" + ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp + {% for entity in all_entities %} + AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}" + {% endfor %} + AND "{{ featureview.name }}__f".is_spine = 1 +{% endfor %} +ORDER BY spine.event_timestamp {% endif %} {% else %} WITH diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index 7184822b896..d492732066e 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -903,18 +903,32 @@ def test_locf_template_multi_fv_date_range(self): # No LATERAL joins assert "LATERAL" not in query - # LOCF CTEs present - assert "stacked" in query.lower() - assert "stacked_with_group" in query.lower() - assert "filled" in query.lower() + # Per-FV LOCF CTEs present (independent forward-fill per feature view) + assert "fv1__stacked" in query + assert "fv1__grouped" in query + assert "fv1__filled" in query + assert "fv2__stacked" in query + assert "fv2__grouped" in query + assert "fv2__filled" in query assert "spine" in query.lower() + # No global stacked/stacked_with_group/filled (replaced by per-FV CTEs) + assert '"stacked_with_group"' not in query + # Correct ORDER BY for deterministic grouping assert "is_spine ASC" in query + # Explicit ROWS framing for deterministic window functions + assert "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" in query + # FIRST_VALUE used for forward-fill assert "FIRST_VALUE" in query + # Per-FV filled results joined back to spine + assert "LEFT JOIN" in query + assert '"fv1__f"' in query or "fv1__filled" in query + assert '"fv2__f"' in query or "fv2__filled" in query + # TTL CASE for fv1 (ttl=86400) assert "86400 * interval" in query # TTL CASE for fv2 (ttl=3600) From 34eed9aaab57416a36454bbe48a512cdd172740e Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Wed, 4 Mar 2026 16:40:05 +0530 Subject: [PATCH 03/12] fix lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 713d4591f6a..81e763b9479 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -165,9 +165,7 @@ def get_historical_features( max_ttl_seconds = 0 for fv in feature_views: if fv.ttl and isinstance(fv.ttl, timedelta): - max_ttl_seconds = max( - max_ttl_seconds, int(fv.ttl.total_seconds()) - ) + max_ttl_seconds = max(max_ttl_seconds, int(fv.ttl.total_seconds())) lookback_start_date: Optional[datetime] = ( start_date - timedelta(seconds=max_ttl_seconds) if max_ttl_seconds > 0 From a4139b7e665a1bd4fe83216929947909de977a57 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Tue, 10 Mar 2026 02:39:14 +0530 Subject: [PATCH 04/12] featureview entities Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../contrib/postgres_offline_store/postgres.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 81e763b9479..4a85fa1a403 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -688,7 +688,7 @@ def _get_entity_schema( "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -709,13 +709,13 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} , FIRST_VALUE("{{ col_name }}") OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ col_name }}" {% endfor %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( - PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" From 74a474c94ff89d05e15bba7f47d2259a694307e6 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Tue, 10 Mar 2026 03:01:31 +0530 Subject: [PATCH 05/12] fix LOCF spine UNION and empty-entity SQL Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 45 +++++++------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 4a85fa1a403..77b2c700f73 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -622,15 +622,14 @@ def _get_entity_schema( {% endfor %} {# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #} +{# Each UNION branch selects all_entities so column count/order match; NULL for entities not in this FV. #} spine AS ( {% for featureview in featureviews %} SELECT DISTINCT - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}"{% if not loop.last %},{% endif %} - {% endfor %} - FROM "{{ featureview.name }}__data" - WHERE event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %} + FROM "{{ featureview.name }}__data" d + WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION {% endif %} @@ -643,10 +642,8 @@ def _get_entity_schema( {% for featureview in featureviews %} "{{ featureview.name }}__stacked" AS ( SELECT - s.event_timestamp, - {% for entity in all_entities %} - s."{{ entity }}", - {% endfor %} + s.event_timestamp{% for entity in all_entities %}, + s."{{ entity }}"{% endfor %}, 1 AS is_spine, NULL::int AS feature_anchor {% for feature in featureview.features %} @@ -663,14 +660,8 @@ def _get_entity_schema( UNION ALL SELECT - d.event_timestamp, - {% for entity in all_entities %} - {% if entity in featureview.entities %} - d."{{ entity }}", - {% else %} - NULL AS "{{ entity }}", - {% endif %} - {% endfor %} + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %}, 0 AS is_spine, 1 AS feature_anchor {% for feature in featureview.features %} @@ -688,7 +679,7 @@ def _get_entity_schema( "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %} + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}1{% endif %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -697,10 +688,8 @@ def _get_entity_schema( "{{ featureview.name }}__filled" AS ( SELECT - event_timestamp, - {% for entity in all_entities %} - "{{ entity }}", - {% endfor %} + event_timestamp{% for entity in all_entities %}, + "{{ entity }}"{% endfor %}, is_spine {% for feature in featureview.features %} {% if full_feature_names %} @@ -709,13 +698,13 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} , FIRST_VALUE("{{ col_name }}") OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ col_name }}" {% endfor %} , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( - PARTITION BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS "{{ featureview.name }}__filled_ts" @@ -725,10 +714,8 @@ def _get_entity_schema( {# --- Final: join per-FV filled results back onto spine --- #} SELECT - spine.event_timestamp, - {% for entity in all_entities %} - spine."{{ entity }}", - {% endfor %} + spine.event_timestamp{% for entity in all_entities %}, + spine."{{ entity }}"{% endfor %}, {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} {% set feat_idx = namespace(count=0) %} {% for featureview in featureviews %} From 6f754b84e4b301cef357dbfc478917343a0fc6ec Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 16:50:25 +0530 Subject: [PATCH 06/12] added fixes Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 72 ++++++++----------- 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 77b2c700f73..48d866ce4e1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -139,17 +139,18 @@ def get_historical_features( end_date = _utc_now() else: end_date = make_tzaware(end_date) + # Find the maximum TTL across all feature views to ensure we capture enough data + max_ttl_seconds = max( + ( + int(fv.ttl.total_seconds()) + for fv in feature_views + if fv.ttl and isinstance(fv.ttl, timedelta) + ), + default=0, + ) # Calculate start_date from TTL if not provided - if start_date is None: - # Find the maximum TTL across all feature views to ensure we capture enough data - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - ttl_seconds = int(fv.ttl.total_seconds()) - max_ttl_seconds = max(max_ttl_seconds, ttl_seconds) - if max_ttl_seconds > 0: # Start from (end_date - max_ttl) to ensure we capture all relevant features start_date = end_date - timedelta(seconds=max_ttl_seconds) @@ -162,25 +163,18 @@ def get_historical_features( # Compute lookback_start_date for LOCF: pull feature data # from (start_date - max_ttl) so window functions can # forward-fill the last observation before start_date. - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - max_ttl_seconds = max(max_ttl_seconds, int(fv.ttl.total_seconds())) lookback_start_date: Optional[datetime] = ( start_date - timedelta(seconds=max_ttl_seconds) if max_ttl_seconds > 0 else start_date ) - entity_df = pd.DataFrame( - { - "event_timestamp": pd.date_range( - start=start_date, end=end_date, freq="1s", tz=timezone.utc - )[:1] # Just one row - } - ) + # Single row with end_date + entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + skip_entity_upload = True else: lookback_start_date = None + skip_entity_upload = False entity_schema = _get_entity_schema(entity_df, config) @@ -211,7 +205,12 @@ def query_generator() -> Iterator[str]: and config.offline_store.entity_select_mode == EntitySelectMode.embed_query ) - if use_cte: + if skip_entity_upload: + # LOCF path never uses left_table + left_table_query_string = ( + "(SELECT NULL::timestamptz AS event_timestamp LIMIT 0)" + ) + elif use_cte: left_table_query_string = entity_df else: left_table_query_string = table_name @@ -255,9 +254,10 @@ def query_generator() -> Iterator[str]: lookback_start_date=lookback_start_date, ) finally: - # Only cleanup if we created a table + # Only cleanup if we created a table (not when skip_entity_upload) if ( - config.offline_store.entity_select_mode + not skip_entity_upload + and config.offline_store.entity_select_mode == EntitySelectMode.temp_table ): with _get_conn(config.offline_store) as conn, conn.cursor() as cur: @@ -573,18 +573,6 @@ def _get_entity_schema( {% endfor %} {% endfor %} -{# Build list of output feature names per FV for consistent column ordering #} -{% set all_feature_cols = [] %} -{% for featureview in featureviews %} - {% for feature in featureview.features %} - {% if full_feature_names %} - {% set _ = all_feature_cols.append(featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature)) %} - {% else %} - {% set _ = all_feature_cols.append(featureview.field_mapping.get(feature, feature)) %} - {% endif %} - {% endfor %} -{% endfor %} - WITH {# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #} {% for featureview in featureviews %} @@ -614,10 +602,6 @@ def _get_entity_schema( FROM "{{ featureview.name }}__data_raw" ) __dedup WHERE __rn = 1 ), -{% else %} -"{{ featureview.name }}__data" AS ( - SELECT * FROM "{{ featureview.name }}__data_raw" -), {% endif %} {% endfor %} @@ -628,7 +612,7 @@ def _get_entity_schema( SELECT DISTINCT d.event_timestamp{% for entity in all_entities %}, {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %} - FROM "{{ featureview.name }}__data" d + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION @@ -673,13 +657,13 @@ def _get_entity_schema( , d."{{ col_name }}" {% endfor %} , d.event_timestamp AS "{{ featureview.name }}__feature_ts" - FROM "{{ featureview.name }}__data" d + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d ), "{{ featureview.name }}__grouped" AS ( SELECT *, COUNT(feature_anchor) OVER ( - PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}1{% endif %} + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}(SELECT NULL){% endif %} ORDER BY event_timestamp ASC, is_spine ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS value_group_id @@ -727,7 +711,7 @@ def _get_entity_schema( {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} {% if featureview.ttl != 0 %} - CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second + CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= make_interval(secs => {{ featureview.ttl }}) THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} {% else %} "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} @@ -823,7 +807,7 @@ def _get_entity_schema( FROM {{ featureview.table_subquery }} AS sub WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second + AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - make_interval(secs => {{ featureview.ttl }}) {% endif %} ), @@ -838,7 +822,7 @@ def _get_entity_schema( AND subquery.event_timestamp <= entity_dataframe.entity_timestamp {% if featureview.ttl == 0 %}{% else %} - AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - {{ featureview.ttl }} * interval '1' second + AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - make_interval(secs => {{ featureview.ttl }}) {% endif %} {% for entity in featureview.entities %} From 7594205af5e408da5db61a3a9af4e3f3345cda73 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 16:52:01 +0530 Subject: [PATCH 07/12] updates test Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/test_postgres.py | 224 +++++++++++++++++- 1 file changed, 220 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index d492732066e..ef9fac30039 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -929,10 +929,9 @@ def test_locf_template_multi_fv_date_range(self): assert '"fv1__f"' in query or "fv1__filled" in query assert '"fv2__f"' in query or "fv2__filled" in query - # TTL CASE for fv1 (ttl=86400) - assert "86400 * interval" in query - # TTL CASE for fv2 (ttl=3600) - assert "3600 * interval" in query + # TTL CASE for fv1 (ttl=86400) and fv2 (ttl=3600) use make_interval + assert "make_interval(secs => 86400)" in query + assert "make_interval(secs => 3600)" in query # lookback_start_date used in feature __data range assert str(lookback.date()) in query @@ -998,6 +997,223 @@ def test_locf_template_no_ttl(self): assert "CASE WHEN" not in query assert "FIRST_VALUE" in query + def test_locf_template_different_entity_sets(self): + """Different entity sets across FVs: NULL AS entity and IS NOT DISTINCT FROM.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # fv1: driver_id, fv2: customer_id -> spine has both, each FV fills its own + fv_contexts = [ + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "customer_fv", + "ttl": 0, + "entities": ["customer_id"], + "features": ["amount"], + "field_mapping": {"amount": "amount"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"customer_table"', + "entity_selections": ['"customer_id" AS "customer_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # all_entities = driver_id, customer_id; each branch uses NULL for the other + assert 'NULL AS "driver_id"' in query or '"driver_id"' in query + assert 'NULL AS "customer_id"' in query or '"customer_id"' in query + assert "IS NOT DISTINCT FROM" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_entityless_feature_view(self): + """Entityless FV: PARTITION BY 1 fallback produces valid SQL (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "global_fv", + "ttl": 0, + "entities": [], + "features": ["global_feat"], + "field_mapping": {"global_feat": "global_feat"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"global_table"', + "entity_selections": [], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # Entityless FV uses PARTITION BY (SELECT NULL) for single partition (more standard than PARTITION BY 1) + assert "PARTITION BY (SELECT NULL)" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_with_created_timestamp_column(self): + """created_timestamp_column set: dedup CTE __data from __data_raw via ROW_NUMBER (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "fv_dedup", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": "created_ts", + "table_subquery": '"fv_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {"f2": "f2"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "fv_dedup__data_raw" in query + assert "fv_dedup__data" in query + assert "ROW_NUMBER()" in query + assert "__dedup" in query + assert "created_ts" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_full_feature_names(self): + """full_feature_names=True: featureview.name__feature column naming.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "my_fv", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=True, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "my_fv__f1" in query or '"my_fv__f1"' in query + sqlglot.parse(query, dialect="postgres") + def test_api_non_entity_functionality(self): """Test that FeatureStore API accepts non-entity parameters correctly""" from feast import FeatureStore From f770c41c065651ba441287664d262fe41da5e973 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 17:08:26 +0530 Subject: [PATCH 08/12] fix entity_df range Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 48d866ce4e1..112a9a6d212 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -186,7 +186,7 @@ def get_historical_features( # min_event_timestamp (= range[0] - TTL) doesn't clip the window. # The synthetic entity_df only has end_date, which would wrongly # set min_event_timestamp to end_date - TTL instead of start_date - TTL. - if start_date is not None and end_date is not None: + if skip_entity_upload and start_date is not None and end_date is not None: entity_df_event_timestamp_range = (start_date, end_date) else: entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( From f235eb88987d32455fdd43b5ab11874ca40163f9 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 18:32:40 +0530 Subject: [PATCH 09/12] updated postgres.py Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 112a9a6d212..46f4ec719ec 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -182,10 +182,6 @@ def get_historical_features( offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) - # In non-entity mode, use the actual requested range so that - # min_event_timestamp (= range[0] - TTL) doesn't clip the window. - # The synthetic entity_df only has end_date, which would wrongly - # set min_event_timestamp to end_date - TTL instead of start_date - TTL. if skip_entity_upload and start_date is not None and end_date is not None: entity_df_event_timestamp_range = (start_date, end_date) else: From 8c66e6f74aaae6fe63cd6329d13607756ce9d435 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Fri, 10 Apr 2026 16:12:26 +0530 Subject: [PATCH 10/12] fix-lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../contrib/postgres_offline_store/postgres.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 46f4ec719ec..cfbd62c6dfd 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -1,6 +1,6 @@ import contextlib from dataclasses import asdict -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from enum import Enum from typing import ( Any, @@ -46,7 +46,7 @@ from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow -from feast.utils import compute_non_entity_date_range +from feast.utils import make_tzaware from .postgres_source import PostgreSQLSource @@ -136,7 +136,7 @@ def get_historical_features( if entity_df is None: # Default to current time if end_date not provided if end_date is None: - end_date = _utc_now() + end_date = datetime.now(tz=timezone.utc) else: end_date = make_tzaware(end_date) # Find the maximum TTL across all feature views to ensure we capture enough data From b6a4703379e7a71f4a2ccc6301834c82dfc3c911 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Thu, 16 Apr 2026 19:35:20 +0530 Subject: [PATCH 11/12] address reviews Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../postgres_offline_store/postgres.py | 45 +--- .../postgres_offline_store/test_postgres.py | 227 +++++++----------- 2 files changed, 100 insertions(+), 172 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index cfbd62c6dfd..55c60297534 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -46,7 +46,7 @@ from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow -from feast.utils import make_tzaware +from feast.utils import compute_non_entity_date_range from .postgres_source import PostgreSQLSource @@ -134,12 +134,11 @@ def get_historical_features( # Handle non-entity retrieval mode if entity_df is None: - # Default to current time if end_date not provided - if end_date is None: - end_date = datetime.now(tz=timezone.utc) - else: - end_date = make_tzaware(end_date) - # Find the maximum TTL across all feature views to ensure we capture enough data + start_date, end_date = compute_non_entity_date_range( + feature_views, + start_date=start_date, + end_date=end_date, + ) max_ttl_seconds = max( ( int(fv.ttl.total_seconds()) @@ -148,43 +147,25 @@ def get_historical_features( ), default=0, ) - - # Calculate start_date from TTL if not provided - if start_date is None: - if max_ttl_seconds > 0: - # Start from (end_date - max_ttl) to ensure we capture all relevant features - start_date = end_date - timedelta(seconds=max_ttl_seconds) - else: - # If no TTL is set, default to 30 days before end_date - start_date = end_date - timedelta(days=30) - else: - start_date = make_tzaware(start_date) - - # Compute lookback_start_date for LOCF: pull feature data - # from (start_date - max_ttl) so window functions can - # forward-fill the last observation before start_date. lookback_start_date: Optional[datetime] = ( start_date - timedelta(seconds=max_ttl_seconds) if max_ttl_seconds > 0 else start_date ) - - # Single row with end_date - entity_df = pd.DataFrame({"event_timestamp": [end_date]}) skip_entity_upload = True else: lookback_start_date = None skip_entity_upload = False - entity_schema = _get_entity_schema(entity_df, config) - - entity_df_event_timestamp_col = ( - offline_utils.infer_event_timestamp_from_entity_df(entity_schema) - ) - - if skip_entity_upload and start_date is not None and end_date is not None: + if skip_entity_upload: + entity_schema: Dict[str, Any] = {"event_timestamp": "timestamp"} + entity_df_event_timestamp_col = "event_timestamp" entity_df_event_timestamp_range = (start_date, end_date) else: + entity_schema = _get_entity_schema(entity_df, config) + entity_df_event_timestamp_col = ( + offline_utils.infer_event_timestamp_from_entity_df(entity_schema) + ) entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( entity_df, entity_df_event_timestamp_col, diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index ef9fac30039..e401b0a31fd 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -574,54 +574,41 @@ def test_non_entity_mode_with_both_dates(self): start_date = datetime(2023, 1, 1, tzinfo=timezone.utc) end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) - # This should not raise an error - validates API signature - with patch.multiple( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", - _get_conn=MagicMock(), - _upload_entity_df=MagicMock(), - _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), - _get_entity_df_event_timestamp_range=MagicMock( - return_value=(start_date, end_date) + with ( + patch.multiple( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", + _get_conn=MagicMock(), ), - ): - with patch( + patch( "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys", return_value=[], - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df" - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", - return_value=[], - ): - try: - retrieval_job = ( - PostgreSQLOfflineStore.get_historical_features( - config=test_repo_config, - feature_views=[feature_view], - feature_refs=["test_fv:feature1"], - entity_df=None, # Non-entity mode - registry=MagicMock(), - project="test_project", - start_date=start_date, - end_date=end_date, - ) - ) - assert isinstance(retrieval_job, RetrievalJob) - except Exception as e: - # Should not fail due to API signature issues - assert "entity_df" not in str(e) - assert "start_date" not in str(e) - assert "end_date" not in str(e) - - def test_non_entity_entity_df_uses_end_date(self): - """Test that the synthetic entity_df uses end_date, not start_date. - - Regression test: the old code used pd.date_range(start=start_date, ...)[:1] - which put start_date in the entity_df. Since PIT joins use - MAX(entity_timestamp) as the upper bound, start_date made end_date - unreachable. The fix uses [end_date] directly. + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df", + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", + return_value=[], + ), + ): + retrieval_job = PostgreSQLOfflineStore.get_historical_features( + config=test_repo_config, + feature_views=[feature_view], + feature_refs=["test_fv:feature1"], + entity_df=None, + registry=MagicMock(), + project="test_project", + start_date=start_date, + end_date=end_date, + ) + assert isinstance(retrieval_job, RetrievalJob) + + def test_non_entity_uses_end_date_as_max_timestamp(self): + """Test that the non-entity path uses end_date as max_event_timestamp. + + The LOCF path skips entity_df creation entirely and sets + entity_df_event_timestamp_range = (start_date, end_date) directly, + so end_date is always the upper bound for feature retrieval. """ test_repo_config = RepoConfig( project="test_project", @@ -634,19 +621,10 @@ def test_non_entity_entity_df_uses_end_date(self): start_date = datetime(2023, 1, 1, tzinfo=timezone.utc) end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) - mock_get_entity_schema = MagicMock( - return_value={"event_timestamp": "timestamp"} - ) - with ( patch.multiple( "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", _get_conn=MagicMock(), - _upload_entity_df=MagicMock(), - _get_entity_schema=mock_get_entity_schema, - _get_entity_df_event_timestamp_range=MagicMock( - return_value=(start_date, end_date) - ), ), patch( "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys", @@ -660,7 +638,7 @@ def test_non_entity_entity_df_uses_end_date(self): return_value=[], ), ): - PostgreSQLOfflineStore.get_historical_features( + retrieval_job = PostgreSQLOfflineStore.get_historical_features( config=test_repo_config, feature_views=[feature_view], feature_refs=["test_fv:feature1"], @@ -671,14 +649,8 @@ def test_non_entity_entity_df_uses_end_date(self): end_date=end_date, ) - # _get_entity_schema is called with the synthetic entity_df - df = mock_get_entity_schema.call_args[0][0] - assert len(df) == 1 - ts = df["event_timestamp"].iloc[0] - # The entity_df must use end_date, not start_date - assert ts == end_date, ( - f"entity_df timestamp should be end_date ({end_date}), got {ts}" - ) + assert retrieval_job.metadata.max_event_timestamp == end_date + assert retrieval_job.metadata.min_event_timestamp == start_date def test_non_entity_mode_with_end_date_only(self): """Test non-entity retrieval calculates start_date from TTL""" @@ -695,53 +667,42 @@ def test_non_entity_mode_with_end_date_only(self): ] end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) - with patch.multiple( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", - _get_conn=MagicMock(), - _upload_entity_df=MagicMock(), - _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), - _get_entity_df_event_timestamp_range=MagicMock( - return_value=(datetime(2023, 1, 6, tzinfo=timezone.utc), end_date) + with ( + patch.multiple( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", + _get_conn=MagicMock(), ), - ): - with patch( + patch( "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys", return_value=[], - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df" - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", - return_value=[], - ): - try: - retrieval_job = ( - PostgreSQLOfflineStore.get_historical_features( - config=test_repo_config, - feature_views=feature_views, - feature_refs=[ - "user_fv:age", - "transaction_fv:amount", - ], - entity_df=None, # Non-entity mode - registry=MagicMock(), - project="test_project", - end_date=end_date, - # start_date not provided - should be calculated from max TTL - ) - ) - assert isinstance(retrieval_job, RetrievalJob) - except Exception as e: - # Should not fail due to TTL calculation issues - assert "ttl" not in str(e).lower() + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df", + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", + return_value=[], + ), + ): + retrieval_job = PostgreSQLOfflineStore.get_historical_features( + config=test_repo_config, + feature_views=feature_views, + feature_refs=["user_fv:age", "transaction_fv:amount"], + entity_df=None, + registry=MagicMock(), + project="test_project", + end_date=end_date, + ) + assert isinstance(retrieval_job, RetrievalJob) + expected_start = end_date - timedelta(days=1) + assert retrieval_job.metadata.min_event_timestamp == expected_start @patch("feast.utils.datetime") def test_no_dates_provided_defaults_to_current_time(self, mock_datetime): """Test that when no dates are provided, end_date defaults to current time""" - # Mock datetime.now() to return a fixed time fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) mock_datetime.now.return_value = fixed_now + mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw) test_repo_config = RepoConfig( project="test_project", @@ -752,48 +713,34 @@ def test_no_dates_provided_defaults_to_current_time(self, mock_datetime): feature_view = _mock_feature_view("test_fv", ttl=timedelta(days=1)) - with patch.multiple( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", - _get_conn=MagicMock(), - _upload_entity_df=MagicMock(), - _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), - _get_entity_df_event_timestamp_range=MagicMock( - return_value=( - datetime(2023, 1, 6, 12, 0, 0, tzinfo=timezone.utc), - fixed_now, - ) + with ( + patch.multiple( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", + _get_conn=MagicMock(), ), - ): - with patch( + patch( "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys", return_value=[], - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df" - ): - with patch( - "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", - return_value=[], - ): - try: - retrieval_job = ( - PostgreSQLOfflineStore.get_historical_features( - config=test_repo_config, - feature_views=[feature_view], - feature_refs=["test_fv:feature1"], - entity_df=None, # Non-entity mode - registry=MagicMock(), - project="test_project", - # No start_date or end_date provided - ) - ) - - # Verify that datetime.now() was called to get current time - mock_datetime.now.assert_called_with(tz=timezone.utc) - assert isinstance(retrieval_job, RetrievalJob) - except Exception as e: - # Should not fail due to datetime issues - assert "datetime" not in str(e).lower() + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df", + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", + return_value=[], + ), + ): + retrieval_job = PostgreSQLOfflineStore.get_historical_features( + config=test_repo_config, + feature_views=[feature_view], + feature_refs=["test_fv:feature1"], + entity_df=None, + registry=MagicMock(), + project="test_project", + ) + + mock_datetime.now.assert_called_with(tz=timezone.utc) + assert isinstance(retrieval_job, RetrievalJob) def test_sql_template_ttl_filtering(self): """Test that the SQL template includes proper TTL filtering""" From 69565bbe8bc79d49513f867f8cb9c5cae6e0e180 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Thu, 16 Apr 2026 20:00:32 +0530 Subject: [PATCH 12/12] lint-fix Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../contrib/postgres_offline_store/postgres.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 55c60297534..5b14a86dec8 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -158,9 +158,13 @@ def get_historical_features( skip_entity_upload = False if skip_entity_upload: + assert start_date is not None and end_date is not None entity_schema: Dict[str, Any] = {"event_timestamp": "timestamp"} entity_df_event_timestamp_col = "event_timestamp" - entity_df_event_timestamp_range = (start_date, end_date) + entity_df_event_timestamp_range: Tuple[datetime, datetime] = ( + start_date, + end_date, + ) else: entity_schema = _get_entity_schema(entity_df, config) entity_df_event_timestamp_col = ( @@ -188,6 +192,7 @@ def query_generator() -> Iterator[str]: "(SELECT NULL::timestamptz AS event_timestamp LIMIT 0)" ) elif use_cte: + assert isinstance(entity_df, str) left_table_query_string = entity_df else: left_table_query_string = table_name