diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index 074213839cc..dd7e1f7bf74 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -56,6 +56,7 @@ The transformation workflow typically involves: 3. **Chunking**: Split documents into smaller, semantically meaningful chunks 4. **Embedding Generation**: Convert text chunks into vector embeddings 5. **Storage**: Store embeddings and metadata in Feast's feature store + ### Feature Transformation for LLMs Feast supports transformations that can be used to: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5cc232d5fca..e644d58a012 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -656,7 +656,7 @@ def _make_inferences( def _get_feature_views_to_materialize( self, feature_views: Optional[List[str]], - ) -> List[FeatureView]: + ) -> List[Union[FeatureView, OnDemandFeatureView]]: """ Returns the list of feature views that should be materialized. @@ -669,34 +669,53 @@ def _get_feature_views_to_materialize( FeatureViewNotFoundException: One of the specified feature views could not be found. ValueError: One of the specified feature views is not configured for materialization. """ - feature_views_to_materialize: List[FeatureView] = [] + feature_views_to_materialize: List[Union[FeatureView, OnDemandFeatureView]] = [] if feature_views is None: - feature_views_to_materialize = utils._list_feature_views( + regular_feature_views = utils._list_feature_views( self._registry, self.project, hide_dummy_entity=False ) - feature_views_to_materialize = [ - fv for fv in feature_views_to_materialize if fv.online - ] + feature_views_to_materialize.extend( + [fv for fv in regular_feature_views if fv.online] + ) stream_feature_views_to_materialize = self._list_stream_feature_views( hide_dummy_entity=False ) - feature_views_to_materialize += [ - sfv for sfv in stream_feature_views_to_materialize if sfv.online - ] + feature_views_to_materialize.extend( + [sfv for sfv in stream_feature_views_to_materialize if sfv.online] + ) + on_demand_feature_views_to_materialize = self.list_on_demand_feature_views() + feature_views_to_materialize.extend( + [ + odfv + for odfv in on_demand_feature_views_to_materialize + if odfv.write_to_online_store + ] + ) else: for name in feature_views: + feature_view: Union[FeatureView, OnDemandFeatureView] try: feature_view = self._get_feature_view(name, hide_dummy_entity=False) except FeatureViewNotFoundException: - feature_view = self._get_stream_feature_view( - name, hide_dummy_entity=False - ) + try: + feature_view = self._get_stream_feature_view( + name, hide_dummy_entity=False + ) + except FeatureViewNotFoundException: + feature_view = self.get_on_demand_feature_view(name) - if not feature_view.online: + if hasattr(feature_view, "online") and not feature_view.online: raise ValueError( f"FeatureView {feature_view.name} is not configured to be served online." ) + elif ( + hasattr(feature_view, "write_to_online_store") + and not feature_view.write_to_online_store + ): + raise ValueError( + f"OnDemandFeatureView {feature_view.name} is not configured for write_to_online_store." + ) feature_views_to_materialize.append(feature_view) return feature_views_to_materialize @@ -866,7 +885,8 @@ def apply( views_to_update = [ ob for ob in objects - if ( + if + ( # BFVs are not handled separately from FVs right now. (isinstance(ob, FeatureView) or isinstance(ob, BatchFeatureView)) and not isinstance(ob, StreamFeatureView) @@ -1312,6 +1332,11 @@ def materialize_incremental( ) # TODO paging large loads for feature_view in feature_views_to_materialize: + from feast.on_demand_feature_view import OnDemandFeatureView + + if isinstance(feature_view, OnDemandFeatureView): + continue + start_date = feature_view.most_recent_end_time if start_date is None: if feature_view.ttl is None: @@ -1352,12 +1377,13 @@ def tqdm_builder(length): tqdm_builder=tqdm_builder, ) - self._registry.apply_materialization( - feature_view, - self.project, - start_date, - end_date, - ) + if not isinstance(feature_view, OnDemandFeatureView): + self._registry.apply_materialization( + feature_view, + self.project, + start_date, + end_date, + ) def materialize( self, @@ -1407,6 +1433,8 @@ def materialize( ) # TODO paging large loads for feature_view in feature_views_to_materialize: + from feast.on_demand_feature_view import OnDemandFeatureView + provider = self._get_provider() print(f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}:") @@ -1426,12 +1454,13 @@ def tqdm_builder(length): tqdm_builder=tqdm_builder, ) - self._registry.apply_materialization( - feature_view, - self.project, - start_date, - end_date, - ) + if not isinstance(feature_view, OnDemandFeatureView): + self._registry.apply_materialization( + feature_view, + self.project, + start_date, + end_date, + ) def _fvs_for_push_source_or_raise( self, push_source_name: str, allow_cache: bool @@ -2033,9 +2062,9 @@ def retrieve_online_documents_v2( distance_metric: The distance metric to use for retrieval. query_string: The query string to retrieve the closest document features using keyword search (bm25). """ - assert query is not None or query_string is not None, ( - "Either query or query_string must be provided." - ) + assert ( + query is not None or query_string is not None + ), "Either query or query_string must be provided." ( available_feature_views, @@ -2348,9 +2377,9 @@ def write_logged_features( if not isinstance(source, FeatureService): raise ValueError("Only feature service is currently supported as a source") - assert source.logging_config is not None, ( - "Feature service must be configured with logging config in order to use this functionality" - ) + assert ( + source.logging_config is not None + ), "Feature service must be configured with logging config in order to use this functionality" assert isinstance(logs, (pa.Table, Path)) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 2c2106f5a3e..d9f12e2c690 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -196,9 +196,9 @@ def __init__( else: features.append(field) - assert len([f for f in features if f.vector_index]) < 2, ( - f"Only one vector feature is allowed per feature view. Please update {self.name}." - ) + assert ( + len([f for f in features if f.vector_index]) < 2 + ), f"Only one vector feature is allowed per feature view. Please update {self.name}." # TODO(felixwang9817): Add more robust validation of features. cols = [field.name for field in schema] diff --git a/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py b/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py index 31c420613a8..187c48c2e93 100644 --- a/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py +++ b/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py @@ -187,9 +187,9 @@ def __init__( online_store: OnlineStore, **kwargs, ): - assert repo_config.offline_store.type == "snowflake.offline", ( - "To use Snowflake Compute Engine, you must use Snowflake as an offline store." - ) + assert ( + repo_config.offline_store.type == "snowflake.offline" + ), "To use Snowflake Compute Engine, you must use Snowflake as an offline store." super().__init__( repo_config=repo_config, @@ -210,11 +210,10 @@ def _materialize_one( project = task.project tqdm_builder = task.tqdm_builder if task.tqdm_builder else tqdm - assert isinstance(feature_view, BatchFeatureView) or isinstance( - feature_view, FeatureView - ), ( - "Snowflake can only materialize FeatureView & BatchFeatureView feature view types." - ) + assert ( + isinstance(feature_view, BatchFeatureView) + or isinstance(feature_view, FeatureView) + ), "Snowflake can only materialize FeatureView & BatchFeatureView feature view types." entities = [] for entity_name in feature_view.entities: diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 72359c4b793..b815552e93e 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -191,7 +191,9 @@ def evaluate_historical_retrieval(): ): # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC entity_df_with_features[entity_df_event_timestamp_col] = ( - entity_df_with_features[entity_df_event_timestamp_col].apply( + entity_df_with_features[ + entity_df_event_timestamp_col + ].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=timezone.utc) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 8eecb0a7866..3152f31fffc 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -326,9 +326,7 @@ def online_read( assert all( field in [f["name"] for f in collection["fields"]] for field in output_fields - ), ( - f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema" - ) + ), f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema" composite_entities = [] for entity_key in entity_keys: entity_key_str = serialize_entity_key( @@ -522,9 +520,7 @@ def retrieve_online_documents_v2( assert all( field in [f["name"] for f in collection["fields"]] for field in output_fields - ), ( - f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema" - ) + ), f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema" # Find the vector search field if we need it ann_search_field = None diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index b77185229d5..41ff938997a 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -460,9 +460,9 @@ def retrieve_online_documents_v2( where the first item is the event timestamp for the row, and the second item is a dict of feature name to embeddings. """ - assert embedding is not None or query_string is not None, ( - "Either embedding or query_string must be specified" - ) + assert ( + embedding is not None or query_string is not None + ), "Either embedding or query_string must be specified" raise NotImplementedError( f"Online store {self.__class__.__name__} does not support online retrieval" ) diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 29a6edf30ad..88101ab04dd 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -73,9 +73,9 @@ def _get_client(self, config: RepoConfig) -> QdrantClient: if self._client: return self._client online_store_config = config.online_store - assert isinstance(online_store_config, QdrantOnlineStoreConfig), ( - "Invalid type for online store config" - ) + assert isinstance( + online_store_config, QdrantOnlineStoreConfig + ), "Invalid type for online store config" assert online_store_config.similarity and ( online_store_config.similarity.lower() in DISTANCE_MAPPING diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 07180fe75ed..c6c253379da 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -790,12 +790,12 @@ def _get_vector_field(table: FeatureView) -> str: vector_fields: List[Field] = [ f for f in table.features if getattr(f, "vector_index", None) ] - assert len(vector_fields) > 0, ( - f"No vector field found, please update feature view = {table.name} to declare a vector field" - ) - assert len(vector_fields) < 2, ( - "Only one vector field is supported, please update feature view = {table.name} to declare one vector field" - ) + assert ( + len(vector_fields) > 0 + ), f"No vector field found, please update feature view = {table.name} to declare a vector field" + assert ( + len(vector_fields) < 2 + ), "Only one vector field is supported, please update feature view = {table.name} to declare one vector field" vector_field: str = vector_fields[0].name return vector_field diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b532ac563d4..c4c90c45aa7 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -16,7 +16,7 @@ import pyarrow as pa from tqdm import tqdm -from feast import OnDemandFeatureView, importer +from feast import importer from feast.base_feature_view import BaseFeatureView from feast.batch_feature_view import BatchFeatureView from feast.data_source import DataSource @@ -38,6 +38,7 @@ from feast.infra.provider import Provider from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import ProviderAsyncMethods +from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -420,13 +421,22 @@ def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table) def materialize_single_feature_view( self, config: RepoConfig, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], start_date: datetime, end_date: datetime, registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], ) -> None: + from feast.on_demand_feature_view import OnDemandFeatureView + + if isinstance(feature_view, OnDemandFeatureView): + if not feature_view.write_to_online_store: + raise ValueError( + f"OnDemandFeatureView {feature_view.name} does not have write_to_online_store enabled" + ) + return + assert ( isinstance(feature_view, BatchFeatureView) or isinstance(feature_view, StreamFeatureView) @@ -496,9 +506,9 @@ def write_feature_service_logs( config: RepoConfig, registry: BaseRegistry, ): - assert feature_service.logging_config is not None, ( - "Logging should be configured for the feature service before calling this function" - ) + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for the feature service before calling this function" self.offline_store.write_logged_features( config=config, @@ -516,9 +526,9 @@ def retrieve_feature_service_logs( config: RepoConfig, registry: BaseRegistry, ) -> RetrievalJob: - assert feature_service.logging_config is not None, ( - "Logging should be configured for the feature service before calling this function" - ) + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for the feature service before calling this function" logging_source = FeatureServiceLoggingSource(feature_service, config.project) schema = logging_source.get_schema(registry) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 15917420af0..4f7b0d4b5c1 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -217,7 +217,7 @@ def ingest_df_to_offline_store( def materialize_single_feature_view( self, config: RepoConfig, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], start_date: datetime, end_date: datetime, registry: BaseRegistry, diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py index 9c7e04dfe31..7739939a744 100644 --- a/sdk/python/feast/offline_server.py +++ b/sdk/python/feast/offline_server.py @@ -266,15 +266,15 @@ def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket): return fl.RecordBatchStream(table) def _validate_offline_write_batch_parameters(self, command: dict): - assert "feature_view_names" in command, ( - "feature_view_names is a mandatory parameter" - ) + assert ( + "feature_view_names" in command + ), "feature_view_names is a mandatory parameter" assert "name_aliases" in command, "name_aliases is a mandatory parameter" feature_view_names = command["feature_view_names"] - assert len(feature_view_names) == 1, ( - "feature_view_names list should only have one item" - ) + assert ( + len(feature_view_names) == 1 + ), "feature_view_names list should only have one item" name_aliases = command["name_aliases"] assert len(name_aliases) == 1, "name_aliases list should only have one item" @@ -316,9 +316,9 @@ def write_logged_features(self, command: dict, key: str): command["feature_service_name"] ) - assert feature_service.logging_config is not None, ( - "feature service must have logging_config set" - ) + assert ( + feature_service.logging_config is not None + ), "feature service must have logging_config set" assert_permissions( resource=feature_service, @@ -335,15 +335,15 @@ def write_logged_features(self, command: dict, key: str): ) def _validate_pull_all_from_table_or_query_parameters(self, command: dict): - assert "data_source_name" in command, ( - "data_source_name is a mandatory parameter" - ) - assert "join_key_columns" in command, ( - "join_key_columns is a mandatory parameter" - ) - assert "feature_name_columns" in command, ( - "feature_name_columns is a mandatory parameter" - ) + assert ( + "data_source_name" in command + ), "data_source_name is a mandatory parameter" + assert ( + "join_key_columns" in command + ), "join_key_columns is a mandatory parameter" + assert ( + "feature_name_columns" in command + ), "feature_name_columns is a mandatory parameter" assert "timestamp_field" in command, "timestamp_field is a mandatory parameter" assert "start_date" in command, "start_date is a mandatory parameter" assert "end_date" in command, "end_date is a mandatory parameter" @@ -367,15 +367,15 @@ def pull_all_from_table_or_query(self, command: dict): ) def _validate_pull_latest_from_table_or_query_parameters(self, command: dict): - assert "data_source_name" in command, ( - "data_source_name is a mandatory parameter" - ) - assert "join_key_columns" in command, ( - "join_key_columns is a mandatory parameter" - ) - assert "feature_name_columns" in command, ( - "feature_name_columns is a mandatory parameter" - ) + assert ( + "data_source_name" in command + ), "data_source_name is a mandatory parameter" + assert ( + "join_key_columns" in command + ), "join_key_columns is a mandatory parameter" + assert ( + "feature_name_columns" in command + ), "feature_name_columns is a mandatory parameter" assert "timestamp_field" in command, "timestamp_field is a mandatory parameter" assert "start_date" in command, "start_date is a mandatory parameter" assert "end_date" in command, "end_date is a mandatory parameter" diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 3abc99e3444..8d71280fdf9 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -463,13 +463,13 @@ def _python_value_to_proto_value( # Numpy convert 0 to int. However, in the feature view definition, the type of column may be a float. # So, if value is 0, type validation must pass if scalar_types are either int or float. allowed_types = {np.int64, int, np.float64, float, decimal.Decimal} - assert type(sample) in allowed_types, ( - f"Type `{type(sample)}` not in {allowed_types}" - ) + assert ( + type(sample) in allowed_types + ), f"Type `{type(sample)}` not in {allowed_types}" else: - assert type(sample) in valid_scalar_types, ( - f"Type `{type(sample)}` not in {valid_scalar_types}" - ) + assert ( + type(sample) in valid_scalar_types + ), f"Type `{type(sample)}` not in {valid_scalar_types}" if feast_value_type == ValueType.BOOL: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. return [ diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 7a31489ac5f..b8bcb6e030b 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -224,9 +224,9 @@ def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType: Raises: ValueError: The conversion could not be performed. """ - assert isinstance(feast_type, (ComplexFeastType, PrimitiveFeastType)), ( - f"Expected FeastType, got {type(feast_type)}" - ) + assert isinstance( + feast_type, (ComplexFeastType, PrimitiveFeastType) + ), f"Expected FeastType, got {type(feast_type)}" if isinstance(feast_type, PrimitiveFeastType): if feast_type in FEAST_TYPES_TO_PYARROW_TYPES: return FEAST_TYPES_TO_PYARROW_TYPES[feast_type] diff --git a/sdk/python/tests/integration/compute_engines/spark/test_compute.py b/sdk/python/tests/integration/compute_engines/spark/test_compute.py index 5254db1e690..406d429db41 100644 --- a/sdk/python/tests/integration/compute_engines/spark/test_compute.py +++ b/sdk/python/tests/integration/compute_engines/spark/test_compute.py @@ -294,9 +294,9 @@ def _check_online_features( assert len(online_response["driver_id"]) == 1 assert online_response["driver_id"][0] == driver_id - assert abs(online_response[feature_ref][0] - expected_value < 1e-6), ( - "Transformed result" - ) + assert abs( + online_response[feature_ref][0] - expected_value < 1e-6 + ), "Transformed result" def _check_offline_features( diff --git a/sdk/python/tests/integration/materialization/test_snowflake.py b/sdk/python/tests/integration/materialization/test_snowflake.py index a783eac0380..5f01641c3b5 100644 --- a/sdk/python/tests/integration/materialization/test_snowflake.py +++ b/sdk/python/tests/integration/materialization/test_snowflake.py @@ -178,9 +178,9 @@ def test_snowflake_materialization_consistency_internal_with_lists( assert actual_value is not None, f"Response: {response_dict}" if feature_dtype == "float": for actual_num, expected_num in zip(actual_value, expected_value): - assert abs(actual_num - expected_num) < 1e-6, ( - f"Response: {response_dict}, Expected: {expected_value}" - ) + assert ( + abs(actual_num - expected_num) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" else: assert actual_value == expected_value diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 5ba99b9d7f1..2586b8c0f74 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -171,9 +171,9 @@ def test_feature_get_online_features_types_match( if config.feature_is_list: for feature in online_features["value"]: assert isinstance(feature, list), "Feature value should be a list" - assert config.has_empty_list or len(feature) > 0, ( - "List of values should not be empty" - ) + assert ( + config.has_empty_list or len(feature) > 0 + ), "List of values should not be empty" for element in feature: assert isinstance(element, expected_dtype) else: @@ -224,9 +224,7 @@ def assert_expected_historical_feature_types( dtype_checkers = feature_dtype_to_expected_historical_feature_dtype[feature_dtype] assert any( check(historical_features_df.dtypes["value"]) for check in dtype_checkers - ), ( - f"Failed to match feature type {historical_features_df.dtypes['value']} with checkers {dtype_checkers}" - ) + ), f"Failed to match feature type {historical_features_df.dtypes['value']} with checkers {dtype_checkers}" def assert_feature_list_types( diff --git a/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py index d692d0f957a..59caaf0b5f2 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py @@ -56,9 +56,9 @@ def test_to_remote_storage(retrieval_job): retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files ) as mock_get_file_names_from_copy, ): - assert retrieval_job.to_remote_storage() == stored_files, ( - "should return the list of files" - ) + assert ( + retrieval_job.to_remote_storage() == stored_files + ), "should return the list of files" mock_to_snowflake.assert_called_once() mock_get_file_names_from_copy.assert_called_once_with(ANY, ANY) native_path = mock_get_file_names_from_copy.call_args[0][1] diff --git a/sdk/python/tests/unit/permissions/test_oidc_auth_client.py b/sdk/python/tests/unit/permissions/test_oidc_auth_client.py index 3d74eb2a55f..68aec70fc79 100644 --- a/sdk/python/tests/unit/permissions/test_oidc_auth_client.py +++ b/sdk/python/tests/unit/permissions/test_oidc_auth_client.py @@ -58,6 +58,6 @@ def _assert_auth_requests_session( "Authorization header is missing in object of class: " "AuthenticatedRequestsSession " ) - assert auth_req_session.headers["Authorization"] == f"Bearer {expected_token}", ( - "Authorization token is incorrect" - ) + assert ( + auth_req_session.headers["Authorization"] == f"Bearer {expected_token}" + ), "Authorization token is incorrect" diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index eb29c645e53..ab1ebe004d9 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1117,6 +1117,179 @@ def python_stored_writes_feature_view( "current_datetime": [None], } + def test_materialize_with_odfv_writes(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + tags={}, + ) + + input_request_source = RequestSource( + name="vals_to_add", + schema=[ + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + ], + ) + + @on_demand_feature_view( + entities=[driver], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + Field(name="string_constant", dtype=String), + ], + mode="python", + write_to_online_store=True, + ) + def python_stored_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], + "string_constant": ["test_constant"], + } + return output + + @on_demand_feature_view( + entities=[driver], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + Field(name="string_constant", dtype=String), + ], + mode="python", + write_to_online_store=False, + ) + def python_no_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], + "string_constant": ["test_constant"], + } + return output + + self.store.apply( + [ + driver, + driver_stats_source, + driver_stats_fv, + python_stored_writes_feature_view, + python_no_writes_feature_view, + ] + ) + + feature_views_to_materialize = self.store._get_feature_views_to_materialize( + None + ) + + odfv_names = [ + fv.name + for fv in feature_views_to_materialize + if hasattr(fv, "write_to_online_store") + ] + assert "python_stored_writes_feature_view" in odfv_names + assert "python_no_writes_feature_view" not in odfv_names + + regular_fv_names = [ + fv.name + for fv in feature_views_to_materialize + if not hasattr(fv, "write_to_online_store") + ] + assert "driver_hourly_stats" in regular_fv_names + + materialize_end_date = datetime.now().replace( + microsecond=0, second=0, minute=0 + ) + materialize_start_date = materialize_end_date - timedelta(days=1) + + self.store.materialize(materialize_start_date, materialize_end_date) + + specific_feature_views_to_materialize = ( + self.store._get_feature_views_to_materialize( + ["driver_hourly_stats", "python_stored_writes_feature_view"] + ) + ) + assert len(specific_feature_views_to_materialize) == 2 + + try: + self.store._get_feature_views_to_materialize( + ["python_no_writes_feature_view"] + ) + assert False, "Should have raised ValueError for ODFV without write_to_online_store" + except ValueError as e: + assert "not configured for write_to_online_store" in str(e) + def test_stored_writes_with_explode(self): with tempfile.TemporaryDirectory() as data_dir: self.store = FeatureStore( diff --git a/sdk/python/tests/unit/test_repo_operations_validate_feast_project_name.py b/sdk/python/tests/unit/test_repo_operations_validate_feast_project_name.py index 33d1d5307d6..0dc4b2651b0 100644 --- a/sdk/python/tests/unit/test_repo_operations_validate_feast_project_name.py +++ b/sdk/python/tests/unit/test_repo_operations_validate_feast_project_name.py @@ -21,6 +21,6 @@ def test_is_valid_name(): ] for name, expected in test_cases: - assert is_valid_name(name) == expected, ( - f"Failed for project invalid name: {name}" - ) + assert ( + is_valid_name(name) == expected + ), f"Failed for project invalid name: {name}" diff --git a/sdk/python/tests/utils/auth_permissions_util.py b/sdk/python/tests/utils/auth_permissions_util.py index dcc456e1d82..8a1e7b7c4d7 100644 --- a/sdk/python/tests/utils/auth_permissions_util.py +++ b/sdk/python/tests/utils/auth_permissions_util.py @@ -101,9 +101,9 @@ def start_feature_server( timeout_msg="Unable to start the Prometheus server in 60 seconds.", ) else: - assert not check_port_open("localhost", 8000), ( - "Prometheus server is running when it should be disabled." - ) + assert not check_port_open( + "localhost", 8000 + ), "Prometheus server is running when it should be disabled." online_server_url = ( f"https://localhost:{server_port}" diff --git a/sdk/python/tests/utils/cli_repo_creator.py b/sdk/python/tests/utils/cli_repo_creator.py index 4b8f9aad04b..34b798b06f3 100644 --- a/sdk/python/tests/utils/cli_repo_creator.py +++ b/sdk/python/tests/utils/cli_repo_creator.py @@ -117,9 +117,9 @@ def local_repo( stderr = result.stderr.decode("utf-8") print(f"Apply stdout:\n{stdout}") print(f"Apply stderr:\n{stderr}") - assert result.returncode == 0, ( - f"stdout: {result.stdout}\nstderr: {result.stderr}" - ) + assert ( + result.returncode == 0 + ), f"stdout: {result.stdout}\nstderr: {result.stderr}" yield FeatureStore(repo_path=str(repo_path), config=None) @@ -129,6 +129,6 @@ def local_repo( stderr = result.stderr.decode("utf-8") print(f"Apply stdout:\n{stdout}") print(f"Apply stderr:\n{stderr}") - assert result.returncode == 0, ( - f"stdout: {result.stdout}\nstderr: {result.stderr}" - ) + assert ( + result.returncode == 0 + ), f"stdout: {result.stdout}\nstderr: {result.stderr}" diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index ed66aead87d..a08e8fef429 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -131,17 +131,17 @@ def _check_offline_and_online_features( if full_feature_names: if expected_value: assert response_dict[f"{fv.name}__value"][0], f"Response: {response_dict}" - assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6, ( - f"Response: {response_dict}, Expected: {expected_value}" - ) + assert ( + abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" else: assert response_dict[f"{fv.name}__value"][0] is None else: if expected_value: assert response_dict["value"][0], f"Response: {response_dict}" - assert abs(response_dict["value"][0] - expected_value) < 1e-6, ( - f"Response: {response_dict}, Expected: {expected_value}" - ) + assert ( + abs(response_dict["value"][0] - expected_value) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" else: assert response_dict["value"][0] is None