From 1752a4fc72f72ac5e7c059f028940e656db5b50b Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Wed, 15 Oct 2025 14:48:36 +0300 Subject: [PATCH 1/3] add pull_all_from_table_or_query for clickhouse, to align with new materialization logic (calling it) Signed-off-by: lukas.valatka --- .../clickhouse_offline_store/clickhouse.py | 37 +++++++++++++++++++ .../registration/test_universal_types.py | 1 - 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py index bca6339fb15..5e8cf3d9053 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -191,6 +191,43 @@ def pull_latest_from_table_or_query( on_demand_feature_views=None, ) + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) + assert isinstance(data_source, ClickhouseSource) + + from_expression = data_source.get_table_query_string() + + timestamp_fields = [timestamp_field] + + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) + + field_string = ", ".join( + join_key_columns + feature_name_columns + timestamp_fields + ) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE {timestamp_field} BETWEEN parseDateTimeBestEffort('{start_date}') AND parseDateTimeBestEffort('{end_date}') + """ + + return ClickhouseRetrievalJob( + query=query, + config=config, + full_feature_names=False, + ) + class ClickhouseRetrievalJob(PostgreSQLRetrievalJob): def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 5ba99b9d7f1..b464cf2f766 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -343,7 +343,6 @@ def offline_types_test_fixtures(request, environment): if ( environment.data_source_creator.__class__.__name__ == "ClickhouseDataSourceCreator" - and config.feature_dtype in {"float", "datetime", "bool"} and config.feature_is_list and not config.has_empty_list ): From 90c9b66ae659463124923ed2517cecfda923e476 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Wed, 5 Nov 2025 13:24:57 +0200 Subject: [PATCH 2/3] add option to select to materialize only latest values, for performance Signed-off-by: lukas.valatka --- .../feast/infra/compute_engines/utils.py | 39 +++++++++++++------ sdk/python/feast/repo_config.py | 13 +++++++ .../feature_repos/repo_configuration.py | 6 ++- .../test_universal_materialization.py | 7 +++- 4 files changed, 51 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/utils.py b/sdk/python/feast/infra/compute_engines/utils.py index 20a3dae981d..94fd3098c11 100644 --- a/sdk/python/feast/infra/compute_engines/utils.py +++ b/sdk/python/feast/infra/compute_engines/utils.py @@ -21,20 +21,35 @@ def create_offline_store_retrieval_job( context: start_time: end_time: - Returns: """ offline_store = context.offline_store - # 📥 Reuse Feast's robust query resolver - retrieval_job = offline_store.pull_all_from_table_or_query( - config=context.repo_config, - data_source=data_source, - join_key_columns=column_info.join_keys, - feature_name_columns=column_info.feature_cols, - timestamp_field=column_info.ts_col, - created_timestamp_column=column_info.created_ts_col, - start_date=start_time, - end_date=end_time, - ) + + pull_latest = context.repo_config.materialization_config.pull_latest_features + + if pull_latest: + retrieval_job = offline_store.pull_latest_from_table_or_query( + config=context.repo_config, + data_source=data_source, + join_key_columns=column_info.join_keys, + feature_name_columns=column_info.feature_cols, + timestamp_field=column_info.ts_col, + created_timestamp_column=column_info.created_ts_col, + start_date=start_time, + end_date=end_time, + ) + else: + # 📥 Reuse Feast's robust query resolver + retrieval_job = offline_store.pull_all_from_table_or_query( + config=context.repo_config, + data_source=data_source, + join_key_columns=column_info.join_keys, + feature_name_columns=column_info.feature_cols, + timestamp_field=column_info.ts_col, + created_timestamp_column=column_info.created_ts_col, + start_date=start_time, + end_date=end_time, + ) + return retrieval_job diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 895002948f1..ac4383e1142 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -182,6 +182,14 @@ def validate_path(cls, path: str, values: ValidationInfo) -> str: return path +class MaterializationConfig(BaseModel): + """Configuration options for feature materialization behavior.""" + + pull_latest_features: StrictBool = False + """ bool: If true, feature retrieval jobs will only pull the latest feature values for each entity. + If false, feature retrieval jobs will pull all feature values within the specified time range. """ + + class RepoConfig(FeastBaseModel): """Repo config. Typically loaded from `feature_store.yaml`""" @@ -239,6 +247,11 @@ class RepoConfig(FeastBaseModel): coerce_tz_aware: Optional[bool] = True """ If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """ + materialization_config: MaterializationConfig = Field( + MaterializationConfig(), alias="materialization" + ) + """ MaterializationConfig: Configuration options for feature materialization behavior. """ + def __init__(self, **data: Any): super().__init__(**data) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 89a13df69ed..14e60cb7cf9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -34,7 +34,7 @@ from feast.permissions.auth_model import OidcClientAuthConfig from feast.permissions.permission import Permission from feast.permissions.policy import RoleBasedPolicy -from feast.repo_config import RegistryConfig, RepoConfig +from feast.repo_config import MaterializationConfig, RegistryConfig, RepoConfig from feast.utils import _utc_now from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, @@ -423,6 +423,9 @@ class Environment: entity_key_serialization_version: int repo_dir_name: str fixture_request: Optional[pytest.FixtureRequest] = None + materialization: MaterializationConfig = dataclasses.field( + default_factory=lambda: MaterializationConfig() + ) def __post_init__(self): self.end_date = _utc_now().replace(microsecond=0, second=0, minute=0) @@ -443,6 +446,7 @@ def setup(self): repo_path=self.repo_dir_name, feature_server=self.feature_server, entity_key_serialization_version=self.entity_key_serialization_version, + materialization_config=self.materialization, ) self.feature_store = FeatureStore(config=self.config) diff --git a/sdk/python/tests/integration/materialization/test_universal_materialization.py b/sdk/python/tests/integration/materialization/test_universal_materialization.py index 860e9a5fc6c..cf15746bf9e 100644 --- a/sdk/python/tests/integration/materialization/test_universal_materialization.py +++ b/sdk/python/tests/integration/materialization/test_universal_materialization.py @@ -219,7 +219,12 @@ def odfv_multi(df: pd.DataFrame) -> pd.DataFrame: @pytest.mark.integration @pytest.mark.universal_offline_stores -def test_universal_materialization_consistency(environment): +@pytest.mark.parametrize("materialization_pull_latest", [True, False]) +def test_universal_materialization_consistency( + environment, materialization_pull_latest +): + environment.materialization.pull_latest_features = materialization_pull_latest + fs = environment.feature_store df = create_basic_driver_dataset() ds = environment.data_source_creator.create_data_source( From 42f9c1b9aaba9adb308648dfe92b988c223e5076 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Wed, 5 Nov 2025 13:38:49 +0200 Subject: [PATCH 3/3] enforce non optional params Signed-off-by: lukas.valatka --- sdk/python/feast/infra/compute_engines/utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/feast/infra/compute_engines/utils.py b/sdk/python/feast/infra/compute_engines/utils.py index 94fd3098c11..d2c49305376 100644 --- a/sdk/python/feast/infra/compute_engines/utils.py +++ b/sdk/python/feast/infra/compute_engines/utils.py @@ -29,6 +29,11 @@ def create_offline_store_retrieval_job( pull_latest = context.repo_config.materialization_config.pull_latest_features if pull_latest: + if not start_time or not end_time: + raise ValueError( + "start_time and end_time must be provided when pull_latest_features is True" + ) + retrieval_job = offline_store.pull_latest_from_table_or_query( config=context.repo_config, data_source=data_source,