diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 57db2c471d7..4ace5eb3b9e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -139,17 +139,18 @@ def get_historical_features( end_date = _utc_now() else: end_date = make_tzaware(end_date) + # Find the maximum TTL across all feature views to ensure we capture enough data + max_ttl_seconds = max( + ( + int(fv.ttl.total_seconds()) + for fv in feature_views + if fv.ttl and isinstance(fv.ttl, timedelta) + ), + default=0, + ) # Calculate start_date from TTL if not provided - if start_date is None: - # Find the maximum TTL across all feature views to ensure we capture enough data - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - ttl_seconds = int(fv.ttl.total_seconds()) - max_ttl_seconds = max(max_ttl_seconds, ttl_seconds) - if max_ttl_seconds > 0: # Start from (end_date - max_ttl) to ensure we capture all relevant features start_date = end_date - timedelta(seconds=max_ttl_seconds) @@ -161,6 +162,13 @@ def get_historical_features( entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + # Single row with end_date + entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + skip_entity_upload = True + else: + lookback_start_date = None + skip_entity_upload = False + entity_schema = _get_entity_schema(entity_df, config) entity_df_event_timestamp_col = ( @@ -190,7 +198,12 @@ def query_generator() -> Iterator[str]: and config.offline_store.entity_select_mode == EntitySelectMode.embed_query ) - if use_cte: + if skip_entity_upload: + # LOCF path never uses left_table + left_table_query_string = ( + "(SELECT NULL::timestamptz AS event_timestamp LIMIT 0)" + ) + elif use_cte: left_table_query_string = entity_df else: left_table_query_string = table_name @@ -231,11 +244,13 @@ def query_generator() -> Iterator[str]: use_cte=use_cte, start_date=start_date, end_date=end_date, + lookback_start_date=lookback_start_date, ) finally: - # Only cleanup if we created a table + # Only cleanup if we created a table (not when skip_entity_upload) if ( - config.offline_store.entity_select_mode + not skip_entity_upload + and config.offline_store.entity_select_mode == EntitySelectMode.temp_table ): with _get_conn(config.offline_store) as conn, conn.cursor() as cur: @@ -445,6 +460,7 @@ def build_point_in_time_query( use_cte: bool = False, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + lookback_start_date: Optional[datetime] = None, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for PostgreSQL""" template = Environment(loader=BaseLoader()).from_string(source=query_template) @@ -475,6 +491,7 @@ def build_point_in_time_query( "use_cte": use_cte, "start_date": start_date, "end_date": end_date, + "lookback_start_date": lookback_start_date, } query = template.render(template_context) @@ -537,89 +554,173 @@ def _get_entity_schema( AND "{{ featureviews[0].timestamp_field }}" >= '{{ featureviews[0].min_event_timestamp }}' {% endif %} {% else %} +{# --- LOCF (Last Observation Carried Forward) path for multi-FV date-range --- #} + +{# Collect deduplicated entity list across all FVs #} +{% set all_entities = [] %} +{% for featureview in featureviews %} + {% for entity in featureview.entities %} + {% if entity not in all_entities %} + {% set _ = all_entities.append(entity) %} + {% endif %} + {% endfor %} +{% endfor %} + WITH +{# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #} {% for featureview in featureviews %} -"{{ featureview.name }}__data" AS ( +"{{ featureview.name }}__data_raw" AS ( SELECT "{{ featureview.timestamp_field }}" AS event_timestamp, - {% if featureview.created_timestamp_column %} - "{{ featureview.created_timestamp_column }}" AS created_timestamp, - {% endif %} {% for entity in featureview.entities %} "{{ entity }}", {% endfor %} {% for feature in featureview.features %} - "{{ feature }}" AS {% if full_feature_names %}"{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if not loop.last %},{% endif %} + "{{ feature }}" AS "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} {% endfor %} + {% if featureview.created_timestamp_column %} + , "{{ featureview.created_timestamp_column }}" AS created_timestamp + {% endif %} FROM {{ featureview.table_subquery }} AS sub - WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ start_date }}' AND '{{ end_date }}' - {% if featureview.ttl != 0 and featureview.min_event_timestamp %} - AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' - {% endif %} + WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ lookback_start_date or start_date }}' AND '{{ end_date }}' +), +{% if featureview.created_timestamp_column %} +"{{ featureview.name }}__data" AS ( + SELECT * FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY {% for entity in featureview.entities %}"{{ entity }}", {% endfor %}event_timestamp + ORDER BY created_timestamp DESC + ) AS __rn + FROM "{{ featureview.name }}__data_raw" + ) __dedup WHERE __rn = 1 ), +{% endif %} {% endfor %} --- Create a base query with all unique entity + timestamp combinations -base_entities AS ( +{# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #} +{# Each UNION branch selects all_entities so column count/order match; NULL for entities not in this FV. #} +spine AS ( {% for featureview in featureviews %} SELECT DISTINCT - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}"{% if not loop.last %},{% endif %} - {% endfor %} - FROM "{{ featureview.name }}__data" + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %} + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d + WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}' {% if not loop.last %} UNION {% endif %} {% endfor %} -) +), -SELECT - base.event_timestamp, - {% set all_entities = [] %} - {% for featureview in featureviews %} - {% for entity in featureview.entities %} - {% if entity not in all_entities %} - {% set _ = all_entities.append(entity) %} +{# --- Per-FV independent LOCF forward-fill pipelines --- #} +{# Each FV is stacked, grouped, and filled independently to prevent #} +{# cross-FV interference when multiple FVs share overlapping timestamps. #} +{% for featureview in featureviews %} +"{{ featureview.name }}__stacked" AS ( + SELECT + s.event_timestamp{% for entity in all_entities %}, + s."{{ entity }}"{% endfor %}, + 1 AS is_spine, + NULL::int AS feature_anchor + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} {% endif %} + , NULL AS "{{ col_name }}" {% endfor %} - {% endfor %} - {% for entity in all_entities %} - base."{{ entity }}", - {% endfor %} + , NULL::timestamptz AS "{{ featureview.name }}__feature_ts" + FROM spine s + + UNION ALL + + SELECT + d.event_timestamp{% for entity in all_entities %}, + {% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %}, + 0 AS is_spine, + 1 AS feature_anchor + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , d."{{ col_name }}" + {% endfor %} + , d.event_timestamp AS "{{ featureview.name }}__feature_ts" + FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d +), + +"{{ featureview.name }}__grouped" AS ( + SELECT *, + COUNT(feature_anchor) OVER ( + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}(SELECT NULL){% endif %} + ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_group_id + FROM "{{ featureview.name }}__stacked" +), + +"{{ featureview.name }}__filled" AS ( + SELECT + event_timestamp{% for entity in all_entities %}, + "{{ entity }}"{% endfor %}, + is_spine + {% for feature in featureview.features %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + , FIRST_VALUE("{{ col_name }}") OVER ( + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS "{{ col_name }}" + {% endfor %} + , FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER ( + PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id + ORDER BY event_timestamp ASC, is_spine ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS "{{ featureview.name }}__filled_ts" + FROM "{{ featureview.name }}__grouped" +){% if not loop.last %},{% endif %} +{% endfor %} + +{# --- Final: join per-FV filled results back onto spine --- #} +SELECT + spine.event_timestamp{% for entity in all_entities %}, + spine."{{ entity }}"{% endfor %}, {% set total_features = featureviews|map(attribute='features')|map('length')|sum %} - {% set feature_counter = namespace(count=0) %} + {% set feat_idx = namespace(count=0) %} {% for featureview in featureviews %} - {% set outer_loop_index = loop.index0 %} {% for feature in featureview.features %} - {% set feature_counter.count = feature_counter.count + 1 %} - fv_{{ outer_loop_index }}."{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if feature_counter.count < total_features %},{% endif %} + {% set feat_idx.count = feat_idx.count + 1 %} + {% if full_feature_names %} + {% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %} + {% else %} + {% set col_name = featureview.field_mapping.get(feature, feature) %} + {% endif %} + {% if featureview.ttl != 0 %} + CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= make_interval(secs => {{ featureview.ttl }}) + THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% else %} + "{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %} + {% endif %} {% endfor %} {% endfor %} -FROM base_entities base +FROM spine {% for featureview in featureviews %} -{% set outer_loop_index = loop.index0 %} -LEFT JOIN LATERAL ( - SELECT DISTINCT ON ({% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}) - event_timestamp, - {% for entity in featureview.entities %} - "{{ entity }}", - {% endfor %} - {% for feature in featureview.features %} - "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %} - {% endfor %} - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - {% for entity in featureview.entities %} - AND fv_sub_{{ outer_loop_index }}."{{ entity }}" = base."{{ entity }}" +LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f" + ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp + {% for entity in all_entities %} + AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}" {% endfor %} - ORDER BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, event_timestamp DESC -) AS fv_{{ outer_loop_index }} ON true + AND "{{ featureview.name }}__f".is_spine = 1 {% endfor %} -ORDER BY base.event_timestamp +ORDER BY spine.event_timestamp {% endif %} {% else %} WITH @@ -699,7 +800,7 @@ def _get_entity_schema( FROM {{ featureview.table_subquery }} AS sub WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second + AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - make_interval(secs => {{ featureview.ttl }}) {% endif %} ), @@ -714,7 +815,7 @@ def _get_entity_schema( AND subquery.event_timestamp <= entity_dataframe.entity_timestamp {% if featureview.ttl == 0 %}{% else %} - AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - {{ featureview.ttl }} * interval '1' second + AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - make_interval(secs => {{ featureview.ttl }}) {% endif %} {% for entity in featureview.entities %} diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index 0cb0d98eeae..ef9fac30039 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -846,44 +846,373 @@ def test_sql_template_ttl_filtering(self): # Should not include TTL filtering when TTL is 0 or min_event_timestamp is None assert 'AND "event_timestamp" >=' not in query_no_ttl - def test_lateral_join_ttl_constraints(self): - """Test that LATERAL JOINs include proper TTL constraints""" - from jinja2 import BaseLoader, Environment + def test_locf_template_multi_fv_date_range(self): + """Test that multi-FV date-range uses LOCF (no LATERAL) with correct CTEs""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) - lateral_template = """ - FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }} - WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp - {% if featureview.ttl != 0 %} - AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second - {% endif %} - """ + fv_contexts = [ + { + "name": "fv1", + "ttl": 86400, + "entities": ["entity1_id"], + "features": ["feat_a"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv1_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-01T00:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 3600, + "entities": ["entity1_id"], + "features": ["feat_b", "feat_c"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"public"."fv2_table"', + "entity_selections": ['"entity1_id" AS "entity1_id"'], + "min_event_timestamp": "2023-01-06T23:00:00", + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - template = Environment(loader=BaseLoader()).from_string(source=lateral_template) + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + lookback = start - timedelta(seconds=86400) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=lookback, + ) - # Test with TTL - context = { - "featureview": { - "name": "user_features", - "ttl": 86400, # 1 day + # No LATERAL joins + assert "LATERAL" not in query + + # Per-FV LOCF CTEs present (independent forward-fill per feature view) + assert "fv1__stacked" in query + assert "fv1__grouped" in query + assert "fv1__filled" in query + assert "fv2__stacked" in query + assert "fv2__grouped" in query + assert "fv2__filled" in query + assert "spine" in query.lower() + + # No global stacked/stacked_with_group/filled (replaced by per-FV CTEs) + assert '"stacked_with_group"' not in query + + # Correct ORDER BY for deterministic grouping + assert "is_spine ASC" in query + + # Explicit ROWS framing for deterministic window functions + assert "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" in query + + # FIRST_VALUE used for forward-fill + assert "FIRST_VALUE" in query + + # Per-FV filled results joined back to spine + assert "LEFT JOIN" in query + assert '"fv1__f"' in query or "fv1__filled" in query + assert '"fv2__f"' in query or "fv2__filled" in query + + # TTL CASE for fv1 (ttl=86400) and fv2 (ttl=3600) use make_interval + assert "make_interval(secs => 86400)" in query + assert "make_interval(secs => 3600)" in query + + # lookback_start_date used in feature __data range + assert str(lookback.date()) in query + + # is_spine filter + assert "is_spine = 1" in query + + def test_locf_template_no_ttl(self): + """Test LOCF template with TTL=0 does not emit CASE for TTL""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "fv1", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv1_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, }, - "outer_loop_index": 0, - } + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] - query = template.render(context) - assert "86400 * interval" in query - assert "base.event_timestamp -" in query + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) - # Test without TTL - context_no_ttl = { - "featureview": { - "name": "user_features", - "ttl": 0, # No TTL + # No LATERAL, no TTL CASE + assert "LATERAL" not in query + assert "CASE WHEN" not in query + assert "FIRST_VALUE" in query + + def test_locf_template_different_entity_sets(self): + """Different entity sets across FVs: NULL AS entity and IS NOT DISTINCT FROM.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # fv1: driver_id, fv2: customer_id -> spine has both, each FV fills its own + fv_contexts = [ + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, }, - "outer_loop_index": 0, - } + { + "name": "customer_fv", + "ttl": 0, + "entities": ["customer_id"], + "features": ["amount"], + "field_mapping": {"amount": "amount"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"customer_table"', + "entity_selections": ['"customer_id" AS "customer_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) - query_no_ttl = template.render(context_no_ttl) - assert "interval" not in query_no_ttl + # all_entities = driver_id, customer_id; each branch uses NULL for the other + assert 'NULL AS "driver_id"' in query or '"driver_id"' in query + assert 'NULL AS "customer_id"' in query or '"customer_id"' in query + assert "IS NOT DISTINCT FROM" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_entityless_feature_view(self): + """Entityless FV: PARTITION BY 1 fallback produces valid SQL (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "global_fv", + "ttl": 0, + "entities": [], + "features": ["global_feat"], + "field_mapping": {"global_feat": "global_feat"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"global_table"', + "entity_selections": [], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "driver_fv", + "ttl": 0, + "entities": ["driver_id"], + "features": ["score"], + "field_mapping": {"score": "score"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"driver_table"', + "entity_selections": ['"driver_id" AS "driver_id"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + # Entityless FV uses PARTITION BY (SELECT NULL) for single partition (more standard than PARTITION BY 1) + assert "PARTITION BY (SELECT NULL)" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_with_created_timestamp_column(self): + """created_timestamp_column set: dedup CTE __data from __data_raw via ROW_NUMBER (multi-FV LOCF path).""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + # Two FVs so template uses LOCF path (single FV uses simple SELECT branch) + fv_contexts = [ + { + "name": "fv_dedup", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": "created_ts", + "table_subquery": '"fv_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + { + "name": "fv2", + "ttl": 0, + "entities": ["eid"], + "features": ["f2"], + "field_mapping": {"f2": "f2"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"fv2_table"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=False, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "fv_dedup__data_raw" in query + assert "fv_dedup__data" in query + assert "ROW_NUMBER()" in query + assert "__dedup" in query + assert "created_ts" in query + sqlglot.parse(query, dialect="postgres") + + def test_locf_template_full_feature_names(self): + """full_feature_names=True: featureview.name__feature column naming.""" + from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + build_point_in_time_query, + ) + + fv_contexts = [ + { + "name": "my_fv", + "ttl": 0, + "entities": ["eid"], + "features": ["f1"], + "field_mapping": {"f1": "f1"}, + "timestamp_field": "ts", + "created_timestamp_column": None, + "table_subquery": '"t"', + "entity_selections": ['"eid" AS "eid"'], + "min_event_timestamp": None, + "max_event_timestamp": "2023-01-07T00:00:00", + "date_partition_column": None, + }, + ] + start = datetime(2023, 1, 1, tzinfo=timezone.utc) + end = datetime(2023, 1, 7, tzinfo=timezone.utc) + + query = build_point_in_time_query( + fv_contexts, + left_table_query_string="unused", + entity_df_event_timestamp_col="event_timestamp", + entity_df_columns={"event_timestamp": None}.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=True, + start_date=start, + end_date=end, + lookback_start_date=start, + ) + + assert "my_fv__f1" in query or '"my_fv__f1"' in query + sqlglot.parse(query, dialect="postgres") def test_api_non_entity_functionality(self): """Test that FeatureStore API accepts non-entity parameters correctly"""