Skip to content

Commit 928431b

Browse files
YassinNouh21franciscojavierarceo
authored andcommitted
fix(postgres): Use end_date in synthetic entity_df for non-entity retrieval (feast-dev#6110)
* fix(postgres): Use end_date instead of start_date in synthetic entity_df for non-entity retrieval The non-entity retrieval path created a synthetic entity_df using pd.date_range(start=start_date, ...)[:1], which placed start_date as the event_timestamp. Since PIT joins use MAX(entity_timestamp) as the upper bound for feature data filtering, using start_date made end_date unreachable — no features after start_date would be returned. Fix: use [end_date] directly, matching the ClickHouse implementation (PR feast-dev#6066) and the Dask offline store behavior. Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> * fix: preserve timestamp range for min_event_timestamp and fix formatting The entity_df fix alone would cause min_event_timestamp to be computed as end_date - TTL (instead of start_date - TTL), clipping valid data from the query window. Override entity_df_event_timestamp_range to (start_date, end_date) in non-entity mode so the full range is used. Also fix ruff formatting in the test file. Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> * test: add integration test for non-entity retrieval Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> --------- Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> Co-authored-by: Francisco Javier Arceo <arceofrancisco@gmail.com> Signed-off-by: Shizoqua <hr.lanreshittu@gmail.com>
1 parent 6635515 commit 928431b

3 files changed

Lines changed: 190 additions & 12 deletions

File tree

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,25 +159,26 @@ def get_historical_features(
159159
else:
160160
start_date = make_tzaware(start_date)
161161

162-
entity_df = pd.DataFrame(
163-
{
164-
"event_timestamp": pd.date_range(
165-
start=start_date, end=end_date, freq="1s", tz=timezone.utc
166-
)[:1] # Just one row
167-
}
168-
)
162+
entity_df = pd.DataFrame({"event_timestamp": [end_date]})
169163

170164
entity_schema = _get_entity_schema(entity_df, config)
171165

172166
entity_df_event_timestamp_col = (
173167
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
174168
)
175169

176-
entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
177-
entity_df,
178-
entity_df_event_timestamp_col,
179-
config,
180-
)
170+
# In non-entity mode, use the actual requested range so that
171+
# min_event_timestamp (= range[0] - TTL) doesn't clip the window.
172+
# The synthetic entity_df only has end_date, which would wrongly
173+
# set min_event_timestamp to end_date - TTL instead of start_date - TTL.
174+
if start_date is not None and end_date is not None:
175+
entity_df_event_timestamp_range = (start_date, end_date)
176+
else:
177+
entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
178+
entity_df,
179+
entity_df_event_timestamp_col,
180+
config,
181+
)
181182

182183
@contextlib.contextmanager
183184
def query_generator() -> Iterator[str]:

sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,3 +728,115 @@ def test_historical_features_field_mapping(
728728
actual_df,
729729
sort_by=["driver_id"],
730730
)
731+
732+
733+
@pytest.mark.integration
734+
@pytest.mark.universal_offline_stores(only=["file"])
735+
def test_historical_features_non_entity_retrieval(environment):
736+
"""Test get_historical_features with entity_df=None using start_date/end_date.
737+
738+
This exercises the non-entity retrieval path where a synthetic entity_df is
739+
generated internally. Regression test for the bug where start_date was used
740+
instead of end_date for min_event_timestamp in the synthetic entity_df.
741+
"""
742+
store = environment.feature_store
743+
744+
now = datetime.now().replace(microsecond=0, second=0, minute=0)
745+
two_days_ago = now - timedelta(days=2)
746+
one_day_ago = now - timedelta(days=1)
747+
748+
driver_stats_df = pd.DataFrame(
749+
data=[
750+
{
751+
"driver_id": 1001,
752+
"avg_daily_trips": 10,
753+
"event_timestamp": two_days_ago,
754+
"created": two_days_ago,
755+
},
756+
{
757+
"driver_id": 1001,
758+
"avg_daily_trips": 20,
759+
"event_timestamp": one_day_ago,
760+
"created": one_day_ago,
761+
},
762+
{
763+
"driver_id": 1001,
764+
"avg_daily_trips": 30,
765+
"event_timestamp": now,
766+
"created": now,
767+
},
768+
{
769+
"driver_id": 1002,
770+
"avg_daily_trips": 100,
771+
"event_timestamp": two_days_ago,
772+
"created": two_days_ago,
773+
},
774+
{
775+
"driver_id": 1002,
776+
"avg_daily_trips": 200,
777+
"event_timestamp": one_day_ago,
778+
"created": one_day_ago,
779+
},
780+
{
781+
"driver_id": 1002,
782+
"avg_daily_trips": 300,
783+
"event_timestamp": now,
784+
"created": now,
785+
},
786+
]
787+
)
788+
789+
start_date = now - timedelta(days=3)
790+
end_date = now + timedelta(hours=1)
791+
792+
driver_stats_data_source = environment.data_source_creator.create_data_source(
793+
df=driver_stats_df,
794+
destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}",
795+
timestamp_field="event_timestamp",
796+
created_timestamp_column="created",
797+
)
798+
799+
driver_entity = Entity(name="driver", join_keys=["driver_id"])
800+
driver_fv = FeatureView(
801+
name="driver_stats",
802+
entities=[driver_entity],
803+
schema=[Field(name="avg_daily_trips", dtype=Int32)],
804+
source=driver_stats_data_source,
805+
)
806+
807+
store.apply([driver_entity, driver_fv])
808+
809+
offline_job = store.get_historical_features(
810+
entity_df=None,
811+
features=["driver_stats:avg_daily_trips"],
812+
full_feature_names=False,
813+
start_date=start_date,
814+
end_date=end_date,
815+
)
816+
817+
actual_df = offline_job.to_df()
818+
819+
assert not actual_df.empty, "Result should not be empty"
820+
assert "avg_daily_trips" in actual_df.columns
821+
822+
actual_driver_ids = set(actual_df["driver_id"].tolist())
823+
assert 1001 in actual_driver_ids, "driver 1001 should be in results"
824+
assert 1002 in actual_driver_ids, "driver 1002 should be in results"
825+
826+
# Verify timestamps fall within the requested range.
827+
# Strip tz info to avoid tz-naive vs tz-aware comparison issues.
828+
ts_start = pd.Timestamp(start_date).tz_localize(None)
829+
ts_end = pd.Timestamp(end_date).tz_localize(None)
830+
for ts in actual_df["event_timestamp"]:
831+
ts_val = pd.Timestamp(ts).tz_localize(None)
832+
assert ts_val >= ts_start, f"Timestamp {ts_val} before start_date"
833+
assert ts_val <= ts_end, f"Timestamp {ts_val} after end_date"
834+
835+
# The latest features must be present -- this is the critical regression check.
836+
# With the old bug (using start_date instead of end_date), the synthetic entity_df
837+
# had wrong max_event_timestamp causing the latest rows to be missed.
838+
actual_trips = set(actual_df["avg_daily_trips"].tolist())
839+
assert 30 in actual_trips, "Latest trip value 30 for driver 1001 should be present"
840+
assert 300 in actual_trips, (
841+
"Latest trip value 300 for driver 1002 should be present"
842+
)

sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,71 @@ def test_non_entity_mode_with_both_dates(self):
615615
assert "start_date" not in str(e)
616616
assert "end_date" not in str(e)
617617

618+
def test_non_entity_entity_df_uses_end_date(self):
619+
"""Test that the synthetic entity_df uses end_date, not start_date.
620+
621+
Regression test: the old code used pd.date_range(start=start_date, ...)[:1]
622+
which put start_date in the entity_df. Since PIT joins use
623+
MAX(entity_timestamp) as the upper bound, start_date made end_date
624+
unreachable. The fix uses [end_date] directly.
625+
"""
626+
test_repo_config = RepoConfig(
627+
project="test_project",
628+
registry="test_registry",
629+
provider="local",
630+
offline_store=_mock_offline_store_config(),
631+
)
632+
633+
feature_view = _mock_feature_view("test_fv", ttl=timedelta(days=1))
634+
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
635+
end_date = datetime(2023, 1, 7, tzinfo=timezone.utc)
636+
637+
mock_get_entity_schema = MagicMock(
638+
return_value={"event_timestamp": "timestamp"}
639+
)
640+
641+
with (
642+
patch.multiple(
643+
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres",
644+
_get_conn=MagicMock(),
645+
_upload_entity_df=MagicMock(),
646+
_get_entity_schema=mock_get_entity_schema,
647+
_get_entity_df_event_timestamp_range=MagicMock(
648+
return_value=(start_date, end_date)
649+
),
650+
),
651+
patch(
652+
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys",
653+
return_value=[],
654+
),
655+
patch(
656+
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df",
657+
),
658+
patch(
659+
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context",
660+
return_value=[],
661+
),
662+
):
663+
PostgreSQLOfflineStore.get_historical_features(
664+
config=test_repo_config,
665+
feature_views=[feature_view],
666+
feature_refs=["test_fv:feature1"],
667+
entity_df=None,
668+
registry=MagicMock(),
669+
project="test_project",
670+
start_date=start_date,
671+
end_date=end_date,
672+
)
673+
674+
# _get_entity_schema is called with the synthetic entity_df
675+
df = mock_get_entity_schema.call_args[0][0]
676+
assert len(df) == 1
677+
ts = df["event_timestamp"].iloc[0]
678+
# The entity_df must use end_date, not start_date
679+
assert ts == end_date, (
680+
f"entity_df timestamp should be end_date ({end_date}), got {ts}"
681+
)
682+
618683
def test_non_entity_mode_with_end_date_only(self):
619684
"""Test non-entity retrieval calculates start_date from TTL"""
620685
test_repo_config = RepoConfig(

0 commit comments

Comments
 (0)