diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index ff41400eace..1a85a4b90c0 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -311,6 +311,16 @@ def with_join_key_map(self, join_key_map: Dict[str, str]): return cp + def update_materialization_intervals( + self, existing_materialization_intervals: List[Tuple[datetime, datetime]] + ): + if ( + len(existing_materialization_intervals) > 0 + and len(self.materialization_intervals) == 0 + ): + for interval in existing_materialization_intervals: + self.materialization_intervals.append((interval[0], interval[1])) + def to_proto(self) -> FeatureViewProto: """ Converts a feature view object to its protobuf representation. diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index b52749a9b2f..55012b97db3 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -29,7 +29,19 @@ from feast.infra.infra_object import Infra from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureService as FeatureServiceProto, +) +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, +) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.transformation.pandas_transformation import PandasTransformation @@ -678,3 +690,19 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: self._message_to_sorted_dict(infra_object.to_proto()) ) return registry_dict + + @staticmethod + def deserialize_registry_values(serialized_proto, feast_obj_type) -> Any: + if feast_obj_type == Entity: + return EntityProto.FromString(serialized_proto) + if feast_obj_type == SavedDataset: + return SavedDatasetProto.FromString(serialized_proto) + if feast_obj_type == FeatureView: + return FeatureViewProto.FromString(serialized_proto) + if feast_obj_type == StreamFeatureView: + return StreamFeatureViewProto.FromString(serialized_proto) + if feast_obj_type == OnDemandFeatureView: + return OnDemandFeatureViewProto.FromString(serialized_proto) + if feast_obj_type == FeatureService: + return FeatureServiceProto.FromString(serialized_proto) + return None diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index df1a419ccf7..d4fa4e5330b 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -265,9 +265,13 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): existing_entity_proto.spec.name == entity_proto.spec.name and existing_entity_proto.spec.project == project ): + entity.created_timestamp = ( + existing_entity_proto.meta.created_timestamp.ToDatetime() + ) + entity_proto = entity.to_proto() + entity_proto.spec.project = project del self.cached_registry_proto.entities[idx] break - self.cached_registry_proto.entities.append(entity_proto) if commit: self.commit() @@ -338,6 +342,11 @@ def apply_feature_service( == feature_service_proto.spec.name and existing_feature_service_proto.spec.project == project ): + feature_service.created_timestamp = ( + existing_feature_service_proto.meta.created_timestamp.ToDatetime() + ) + feature_service_proto = feature_service.to_proto() + feature_service_proto.spec.project = project del registry.feature_services[idx] registry.feature_services.append(feature_service_proto) if commit: @@ -410,6 +419,18 @@ def apply_feature_view( ): return else: + existing_feature_view = type(feature_view).from_proto( + existing_feature_view_proto + ) + feature_view.created_timestamp = ( + existing_feature_view.created_timestamp + ) + if isinstance(feature_view, (FeatureView, StreamFeatureView)): + feature_view.update_materialization_intervals( + existing_feature_view.materialization_intervals + ) + feature_view_proto = feature_view.to_proto() + feature_view_proto.spec.project = project del existing_feature_views_of_same_type[idx] break @@ -638,6 +659,17 @@ def apply_saved_dataset( existing_saved_dataset_proto.spec.name == saved_dataset_proto.spec.name and existing_saved_dataset_proto.spec.project == project ): + saved_dataset.created_timestamp = ( + existing_saved_dataset_proto.meta.created_timestamp.ToDatetime() + ) + saved_dataset.min_event_timestamp = ( + existing_saved_dataset_proto.meta.min_event_timestamp.ToDatetime() + ) + saved_dataset.max_event_timestamp = ( + existing_saved_dataset_proto.meta.max_event_timestamp.ToDatetime() + ) + saved_dataset_proto = saved_dataset.to_proto() + saved_dataset_proto.spec.project = project del self.cached_registry_proto.saved_datasets[idx] break diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 4336db232fb..37da649c1c9 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -276,6 +276,7 @@ def apply_materialization( start_date_timestamp.FromDatetime(start_date) end_date_timestamp.FromDatetime(end_date) + # TODO: for this to work for stream feature views, ApplyMaterializationRequest needs to be updated request = RegistryServer_pb2.ApplyMaterializationRequest( feature_view=feature_view.to_proto(), project=project, diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 26f9da19e18..3ead3a2ec57 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -687,6 +687,24 @@ def _apply_object( obj.last_updated_timestamp = update_datetime if row: + if proto_field_name in [ + "entity_proto", + "saved_dataset_proto", + "feature_view_proto", + "feature_service_proto", + ]: + deserialized_proto = self.deserialize_registry_values( + row._mapping[proto_field_name], type(obj) + ) + obj.created_timestamp = ( + deserialized_proto.meta.created_timestamp.ToDatetime() + ) + if isinstance(obj, (FeatureView, StreamFeatureView)): + obj.update_materialization_intervals( + type(obj) + .from_proto(deserialized_proto) + .materialization_intervals + ) values = { proto_field_name: obj.to_proto().SerializeToString(), "last_updated_timestamp": update_time, diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 85038ad6ff3..444ae5797bb 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -3,6 +3,7 @@ import grpc from google.protobuf.empty_pb2 import Empty +from pytz import utc from feast import FeatureStore from feast.data_source import DataSource @@ -293,10 +294,10 @@ def ApplyMaterialization( feature_view=FeatureView.from_proto(request.feature_view), project=request.project, start_date=datetime.fromtimestamp( - request.start_date.seconds + request.start_date.nanos / 1e9 + request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc ), end_date=datetime.fromtimestamp( - request.end_date.seconds + request.end_date.nanos / 1e9 + request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc ), commit=request.commit, ) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 65d07aca45c..5469f3857aa 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -14,7 +14,7 @@ import logging import os import time -from datetime import timedelta +from datetime import datetime, timedelta from tempfile import mkstemp from unittest import mock @@ -22,12 +22,13 @@ import pandas as pd import pytest from pytest_lazyfixture import lazy_fixture +from pytz import utc from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.minio import MinioContainer from testcontainers.mysql import MySqlContainer -from feast import FileSource, RequestSource +from feast import FeatureService, FileSource, RequestSource from feast.data_format import AvroFormat, ParquetFormat from feast.data_source import KafkaSource from feast.entity import Entity @@ -308,6 +309,22 @@ def test_apply_entity_success(test_registry): # After the first apply, the created_timestamp should be the same as the last_update_timestamp. assert entity.created_timestamp == entity.last_updated_timestamp + # Update entity + updated_entity = Entity( + name="driver_car_id", + description="Car driver Id", + tags={"team": "matchmaking"}, + ) + test_registry.apply_entity(updated_entity, project) + + updated_entity = test_registry.get_entity("driver_car_id", project) + + # The created_timestamp for the entity should be set to the created_timestamp value stored from the previous apply + assert ( + updated_entity.created_timestamp is not None + and updated_entity.created_timestamp == entity.created_timestamp + ) + test_registry.delete_entity("driver_car_id", project) assert_project_uuid(project, project_uuid, test_registry) entities = test_registry.list_entities(project) @@ -601,11 +618,54 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") return data + def simple_udf(x: int): + return x + 3 + + entity_sfv = Entity(name="sfv_my_entity_1", join_keys=["test_key"]) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + watermark_delay_threshold=timedelta(days=1), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity_sfv], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + project = "project" # Register Feature Views test_registry.apply_feature_view(odfv1, project) test_registry.apply_feature_view(fv1, project) + test_registry.apply_feature_view(sfv, project) # Modify odfv by changing a single feature dtype @on_demand_feature_view( @@ -621,6 +681,8 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") return data + existing_odfv = test_registry.get_on_demand_feature_view("odfv1", project) + # Apply the modified odfv test_registry.apply_feature_view(odfv1, project) @@ -655,6 +717,11 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and list(request_schema.values())[0] == ValueType.INT32 ) + assert ( + feature_view.created_timestamp is not None + and feature_view.created_timestamp == existing_odfv.created_timestamp + ) + # Make sure fv1 is untouched feature_views = test_registry.list_feature_views(project) @@ -675,7 +742,162 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and feature_view.entities[0] == "fs1_my_entity_1" ) - test_registry.teardown() + # Simulate materialization + current_date = datetime.utcnow() + end_date = current_date.replace(tzinfo=utc) + start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) + test_registry.apply_materialization(feature_view, project, start_date, end_date) + materialized_feature_view = test_registry.get_feature_view( + "my_feature_view_1", project + ) + + # Check if created_timestamp, along with materialized_intervals are updated + assert ( + materialized_feature_view.created_timestamp is not None + and materialized_feature_view.created_timestamp + == feature_view.created_timestamp + and len(materialized_feature_view.materialization_intervals) > 0 + and materialized_feature_view.materialization_intervals[0][0] == start_date + and materialized_feature_view.materialization_intervals[0][1] == end_date + ) + + # Modify fv1 by changing a single dtype + updated_fv1 = FeatureView( + name="my_feature_view_1", + schema=[ + Field(name="test", dtype=Int64), + Field(name="fs1_my_feature_1", dtype=String), + ], + entities=[entity], + tags={"team": "matchmaking"}, + source=batch_source, + ttl=timedelta(minutes=5), + ) + + # Check that these fields are empty before apply + assert updated_fv1.created_timestamp is None + assert len(updated_fv1.materialization_intervals) == 0 + + # Apply the modified fv1 + test_registry.apply_feature_view(updated_fv1, project) + + # Verify feature view after modification + updated_feature_views = test_registry.list_feature_views(project) + + # List Feature Views + assert ( + len(updated_feature_views) == 1 + and updated_feature_views[0].name == "my_feature_view_1" + and updated_feature_views[0].features[0].name == "fs1_my_feature_1" + and updated_feature_views[0].features[0].dtype == String + and updated_feature_views[0].entities[0] == "fs1_my_entity_1" + ) + + updated_feature_view = test_registry.get_feature_view("my_feature_view_1", project) + assert ( + updated_feature_view.name == "my_feature_view_1" + and updated_feature_view.features[0].name == "fs1_my_feature_1" + and updated_feature_view.features[0].dtype == String + and updated_feature_view.entities[0] == "fs1_my_entity_1" + ) + + # Check if materialization_intervals and created_timestamp values propagates on each apply + # materialization_intervals will populate only when it's empty + assert ( + updated_feature_view.created_timestamp is not None + and updated_feature_view.created_timestamp == feature_view.created_timestamp + and len(updated_feature_view.materialization_intervals) == 1 + and updated_feature_view.materialization_intervals[0][0] == start_date + and updated_feature_view.materialization_intervals[0][1] == end_date + ) + + # Simulate materialization a second time + current_date = datetime.utcnow() + end_date_1 = current_date.replace(tzinfo=utc) + start_date_1 = (current_date - timedelta(days=1)).replace(tzinfo=utc) + test_registry.apply_materialization( + updated_feature_view, project, start_date_1, end_date_1 + ) + materialized_feature_view_1 = test_registry.get_feature_view( + "my_feature_view_1", project + ) + + assert ( + materialized_feature_view_1.created_timestamp is not None + and materialized_feature_view_1.created_timestamp + == feature_view.created_timestamp + and len(materialized_feature_view_1.materialization_intervals) == 2 + and materialized_feature_view_1.materialization_intervals[0][0] == start_date + and materialized_feature_view_1.materialization_intervals[0][1] == end_date + and materialized_feature_view_1.materialization_intervals[1][0] == start_date_1 + and materialized_feature_view_1.materialization_intervals[1][1] == end_date_1 + ) + + # Modify sfv by changing the dtype + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity_sfv], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=String)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + existing_sfv = test_registry.get_stream_feature_view( + "test kafka stream feature view", project + ) + # Apply the modified sfv + test_registry.apply_feature_view(sfv, project) + + # Verify feature view after modification + updated_stream_feature_views = test_registry.list_stream_feature_views(project) + + # List Feature Views + assert ( + len(updated_stream_feature_views) == 1 + and updated_stream_feature_views[0].name == "test kafka stream feature view" + and updated_stream_feature_views[0].features[0].name == "dummy_field" + and updated_stream_feature_views[0].features[0].dtype == String + and updated_stream_feature_views[0].entities[0] == "sfv_my_entity_1" + ) + + updated_sfv = test_registry.get_stream_feature_view( + "test kafka stream feature view", project + ) + assert ( + updated_sfv.name == "test kafka stream feature view" + and updated_sfv.features[0].name == "dummy_field" + and updated_sfv.features[0].dtype == String + and updated_sfv.entities[0] == "sfv_my_entity_1" + ) + + # The created_timestamp for the stream feature view should be set to the created_timestamp value stored from the + # previous apply + # Materialization_intervals is not set + assert ( + updated_sfv.created_timestamp is not None + and updated_sfv.created_timestamp == existing_sfv.created_timestamp + and len(updated_sfv.materialization_intervals) == 0 + ) @pytest.mark.integration @@ -824,7 +1046,7 @@ def simple_udf(x: int): project = "project" - # Register Feature View + # Register Stream Feature View test_registry.apply_feature_view(sfv, project) stream_feature_views = test_registry.list_stream_feature_views(project) @@ -840,6 +1062,100 @@ def simple_udf(x: int): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_apply_feature_service_success(test_registry): + # Create Feature Service + file_source = FileSource(name="my_file_source", path="test.parquet") + feature_view = FeatureView( + name="my_feature_view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] + ) + project = "project" + + # Register Feature Service + test_registry.apply_feature_service(fs, project) + + feature_services = test_registry.list_feature_services(project) + + # List Feature Services + assert len(feature_services) == 1 + assert feature_services[0] == fs + + # Delete Feature Service + test_registry.delete_feature_service("my_feature_service_1", project) + feature_services = test_registry.list_feature_services(project) + assert len(feature_services) == 0 + + test_registry.teardown() + + +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_modify_feature_service_success(test_registry): + # Create Feature Service + file_source = FileSource(name="my_file_source", path="test.parquet") + feature_view = FeatureView( + name="my_feature_view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] + ) + project = "project" + + # Register Feature service + test_registry.apply_feature_service(fs, project) + + feature_services = test_registry.list_feature_services(project) + + # List Feature Services + assert len(feature_services) == 1 + assert feature_services[0] == fs + + # Modify Feature Service by removing a feature + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1"]]] + ) + + # Apply modified Feature Service + test_registry.apply_feature_service(fs, project) + + updated_feature_services = test_registry.list_feature_services(project) + + # Verify Feature Services + assert len(updated_feature_services) == 1 + assert updated_feature_services[0] == fs + # The created_timestamp for the feature service should be set to the created_timestamp value stored from the + # previous apply + assert ( + updated_feature_services[0].created_timestamp is not None + and updated_feature_services[0].created_timestamp + == feature_services[0].created_timestamp + ) + + test_registry.teardown() + + @pytest.mark.integration def test_commit(): fd, registry_path = mkstemp() diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index b3e6762c17d..0ce3fad98de 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -321,6 +321,20 @@ def test_reapply_feature_view(test_feature_store, dataframe_source): # Check Feature View fv_stored = test_feature_store.get_feature_view(fv1.name) + assert len(fv_stored.materialization_intervals) == 1 + + # Change and apply Feature View, this time, only the name + fv2 = FeatureView( + name="my_feature_view_2", + schema=[Field(name="int64_col", dtype=Int64)], + entities=[e], + source=file_source, + ttl=timedelta(minutes=5), + ) + test_feature_store.apply([fv2]) + + # Check Feature View + fv_stored = test_feature_store.get_feature_view(fv2.name) assert len(fv_stored.materialization_intervals) == 0 test_feature_store.teardown() diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 0220d1a8a95..b387f55d8b0 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,8 +1,9 @@ -from datetime import timedelta +from datetime import datetime, timedelta import pytest from typeguard import TypeCheckError +from feast import utils from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat from feast.data_source import KafkaSource @@ -117,3 +118,53 @@ def test_hash(): def test_field_types(): with pytest.raises(TypeCheckError): Field(name="name", dtype=ValueType.INT32) + + +def test_update_materialization_intervals(): + batch_source = FileSource(path="some path") + entity = Entity(name="entity_1", description="Some entity") + # Create a feature view that is already present in the SQL registry + stored_feature_view = FeatureView( + name="my-feature-view", + entities=[entity], + ttl=timedelta(days=1), + source=batch_source, + ) + + # Update the Feature View without modifying anything + updated_feature_view = FeatureView( + name="my-feature-view", + entities=[entity], + ttl=timedelta(days=1), + source=batch_source, + ) + updated_feature_view.update_materialization_intervals( + stored_feature_view.materialization_intervals + ) + assert len(updated_feature_view.materialization_intervals) == 0 + + current_time = datetime.utcnow() + start_date = utils.make_tzaware(current_time - timedelta(days=1)) + end_date = utils.make_tzaware(current_time) + updated_feature_view.materialization_intervals.append((start_date, end_date)) + + # Update the Feature View, i.e. simply update the name + second_updated_feature_view = FeatureView( + name="my-feature-view-1", + entities=[entity], + ttl=timedelta(days=1), + source=batch_source, + ) + + second_updated_feature_view.update_materialization_intervals( + updated_feature_view.materialization_intervals + ) + assert len(second_updated_feature_view.materialization_intervals) == 1 + assert ( + second_updated_feature_view.materialization_intervals[0][0] + == updated_feature_view.materialization_intervals[0][0] + ) + assert ( + second_updated_feature_view.materialization_intervals[0][1] + == updated_feature_view.materialization_intervals[0][1] + ) diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index b53f9a593ae..77431666c30 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -1,8 +1,9 @@ import copy -from datetime import timedelta +from datetime import datetime, timedelta import pytest +from feast import utils from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat @@ -250,3 +251,82 @@ def test_stream_feature_view_copy(): aggregations=[], ) assert sfv == copy.copy(sfv) + + +def test_update_materialization_intervals(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + # Create a stream feature view that is already present in the SQL registry + stored_stream_feature_view = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + current_time = datetime.utcnow() + start_date = utils.make_tzaware(current_time - timedelta(days=1)) + end_date = utils.make_tzaware(current_time) + stored_stream_feature_view.materialization_intervals.append((start_date, end_date)) + + # Update the stream feature view i.e. here it's simply the name + updated_stream_feature_view = StreamFeatureView( + name="test kafka stream feature view updated", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + updated_stream_feature_view.update_materialization_intervals( + stored_stream_feature_view.materialization_intervals + ) + + assert ( + updated_stream_feature_view.materialization_intervals is not None + and len(stored_stream_feature_view.materialization_intervals) == 1 + ) + assert ( + updated_stream_feature_view.materialization_intervals[0][0] + == stored_stream_feature_view.materialization_intervals[0][0] + ) + assert ( + updated_stream_feature_view.materialization_intervals[0][1] + == stored_stream_feature_view.materialization_intervals[0][1] + ) diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 885798db109..d9104bae420 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -78,6 +78,16 @@ def validate_offline_online_store_consistency( # run materialize_incremental() fs.materialize_incremental(feature_views=[fv.name], end_date=now) + updated_fv = fs.registry.get_feature_view(fv.name, fs.project) + + # Check if materialization_intervals was updated by the registry + assert ( + len(updated_fv.materialization_intervals) == 2 + and updated_fv.materialization_intervals[0][0] == start_date + and updated_fv.materialization_intervals[0][1] == end_date + and updated_fv.materialization_intervals[1][0] == end_date + and updated_fv.materialization_intervals[1][1] == now.replace(tzinfo=utc) + ) # check result of materialize_incremental() _check_offline_and_online_features(