Skip to content
Open
Prev Previous commit
Next Next commit
fix issues
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com>

rh-pre-commit.version: 2.3.2
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
Vperiodt committed Apr 10, 2026
commit 3da5af3ec2491b63ca10533b0c7e2298cb3aba89
Original file line number Diff line number Diff line change
Expand Up @@ -639,103 +639,97 @@ def _get_entity_schema(
{% endfor %}
),

{# --- CTE 1: Stack spine + feature rows via UNION ALL --- #}
stacked AS (
{# Spine rows: is_spine=1, feature_anchor NULL, all features NULL, per-FV timestamps NULL #}
{# --- Per-FV independent LOCF forward-fill pipelines --- #}
{# Each FV is stacked, grouped, and filled independently to prevent #}
{# cross-FV interference when multiple FVs share overlapping timestamps. #}
{% for featureview in featureviews %}
"{{ featureview.name }}__stacked" AS (
SELECT
event_timestamp,
s.event_timestamp,
{% for entity in all_entities %}
"{{ entity }}",
s."{{ entity }}",
{% endfor %}
1 AS is_spine,
NULL::int AS feature_anchor
{% for feat in all_feature_cols %}
, NULL AS "{{ feat }}"
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, NULL AS "{{ col_name }}"
{% endfor %}
{% for featureview in featureviews %}
, NULL::timestamptz AS "{{ featureview.name }}__feature_ts"
{% endfor %}
FROM spine
FROM spine s

{% for featureview in featureviews %}
UNION ALL
{# Feature rows for {{ featureview.name }}: is_spine=0, feature_anchor=1 #}

SELECT
event_timestamp,
d.event_timestamp,
{% for entity in all_entities %}
{% if entity in featureview.entities %}
"{{ entity }}",
d."{{ entity }}",
{% else %}
NULL AS "{{ entity }}",
{% endif %}
{% endfor %}
0 AS is_spine,
1 AS feature_anchor
{# Emit this FV's features; NULL for other FVs' features #}
{% for fv_inner in featureviews %}
{% for feature in fv_inner.features %}
{% if full_feature_names %}
{% set col_name = fv_inner.name ~ '__' ~ fv_inner.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = fv_inner.field_mapping.get(feature, feature) %}
{% endif %}
{% if fv_inner.name == featureview.name %}
, "{{ col_name }}"
{% else %}
, NULL AS "{{ col_name }}"
{% endif %}
{% endfor %}
{% endfor %}
{# Per-FV feature timestamp: this FV gets event_timestamp, others NULL #}
{% for fv_inner in featureviews %}
{% if fv_inner.name == featureview.name %}
, event_timestamp AS "{{ fv_inner.name }}__feature_ts"
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
, NULL::timestamptz AS "{{ fv_inner.name }}__feature_ts"
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, d."{{ col_name }}"
{% endfor %}
FROM "{{ featureview.name }}__data"
{% endfor %}
, d.event_timestamp AS "{{ featureview.name }}__feature_ts"
FROM "{{ featureview.name }}__data" d
),

{# --- CTE 2: Group boundaries via COUNT (only feature rows increment) --- #}
stacked_with_group AS (
"{{ featureview.name }}__grouped" AS (
SELECT *,
COUNT(feature_anchor) OVER (
PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}
ORDER BY event_timestamp ASC, is_spine ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS value_group_id
FROM stacked
FROM "{{ featureview.name }}__stacked"
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
),

{# --- CTE 3: Forward-fill features + feature timestamps via FIRST_VALUE --- #}
filled AS (
"{{ featureview.name }}__filled" AS (
SELECT
event_timestamp,
{% for entity in all_entities %}
"{{ entity }}",
{% endfor %}
is_spine
{% for feat in all_feature_cols %}
, FIRST_VALUE("{{ feat }}") OVER (
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, FIRST_VALUE("{{ col_name }}") OVER (
PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id
ORDER BY event_timestamp ASC, is_spine ASC
) AS "{{ feat }}"
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS "{{ col_name }}"
{% endfor %}
{% for featureview in featureviews %}
, FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER (
PARTITION BY {% for entity in all_entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, value_group_id
ORDER BY event_timestamp ASC, is_spine ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS "{{ featureview.name }}__filled_ts"
{% endfor %}
FROM stacked_with_group
)
FROM "{{ featureview.name }}__grouped"
){% if not loop.last %},{% endif %}
{% endfor %}

{# --- CTE 4: Filter spine + TTL validation --- #}
{# --- Final: join per-FV filled results back onto spine --- #}
SELECT
event_timestamp,
spine.event_timestamp,
{% for entity in all_entities %}
"{{ entity }}",
spine."{{ entity }}",
{% endfor %}
{% set total_features = featureviews|map(attribute='features')|map('length')|sum %}
{% set feat_idx = namespace(count=0) %}
Expand All @@ -748,16 +742,23 @@ def _get_entity_schema(
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
{% if featureview.ttl != 0 %}
CASE WHEN (event_timestamp - "{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second
THEN "{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= {{ featureview.ttl }} * interval '1' second
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TTL enforcement via CASE is correct — when filled_ts is NULL (no prior observation), the subtraction yields NULL, the comparison is UNKNOWN, and CASE falls through to ELSE NULL.

One edge case: if featureview.ttl is a very large number, {{ featureview.ttl }} * interval '1' second could overflow on some Postgres versions. Consider using make_interval(secs => {{ featureview.ttl }}) for safety, though in practice TTLs are unlikely to overflow.

THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
{% else %}
"{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
"{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
{% endif %}
{% endfor %}
{% endfor %}
FROM filled
WHERE is_spine = 1
ORDER BY event_timestamp
FROM spine
{% for featureview in featureviews %}
LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f"
ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp
{% for entity in all_entities %}
AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of IS NOT DISTINCT FROM here. This fixes a bug in the old LATERAL join path where NULL = NULL would fail the join for FVs with different entity sets. Worth calling this out in the PR description as a bug fix.

{% endfor %}
AND "{{ featureview.name }}__f".is_spine = 1
{% endfor %}
ORDER BY spine.event_timestamp
{% endif %}
{% else %}
WITH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,18 +903,32 @@ def test_locf_template_multi_fv_date_range(self):
# No LATERAL joins
assert "LATERAL" not in query

# LOCF CTEs present
assert "stacked" in query.lower()
assert "stacked_with_group" in query.lower()
assert "filled" in query.lower()
# Per-FV LOCF CTEs present (independent forward-fill per feature view)
assert "fv1__stacked" in query
assert "fv1__grouped" in query
assert "fv1__filled" in query
assert "fv2__stacked" in query
assert "fv2__grouped" in query
assert "fv2__filled" in query
assert "spine" in query.lower()

# No global stacked/stacked_with_group/filled (replaced by per-FV CTEs)
assert '"stacked_with_group"' not in query

# Correct ORDER BY for deterministic grouping
assert "is_spine ASC" in query

# Explicit ROWS framing for deterministic window functions
assert "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" in query

# FIRST_VALUE used for forward-fill
assert "FIRST_VALUE" in query

# Per-FV filled results joined back to spine
assert "LEFT JOIN" in query
assert '"fv1__f"' in query or "fv1__filled" in query
assert '"fv2__f"' in query or "fv2__filled" in query

# TTL CASE for fv1 (ttl=86400)
assert "86400 * interval" in query
# TTL CASE for fv2 (ttl=3600)
Expand Down