From 04b9867f8bab3a270858c46af67cfd99e9247a9d Mon Sep 17 00:00:00 2001 From: msistla96 Date: Wed, 5 Jun 2024 12:05:54 -0500 Subject: [PATCH 1/6] fix to update feast object metadata in the sql registry Signed-off-by: msistla96 --- Created_timestamp_fix.patch | 626 ++++++++++++++++++ sdk/python/feast/base_feature_view.py | 3 + sdk/python/feast/entity.py | 4 + sdk/python/feast/feature_service.py | 6 + sdk/python/feast/feature_view.py | 9 + sdk/python/feast/infra/registry/sql.py | 7 + sdk/python/feast/on_demand_feature_view.py | 6 + sdk/python/feast/saved_dataset.py | 10 + sdk/python/feast/stream_feature_view.py | 14 + .../registration/test_universal_registry.py | 247 ++++++- sdk/python/tests/unit/test_entity.py | 24 + sdk/python/tests/unit/test_feature_service.py | 38 ++ sdk/python/tests/unit/test_feature_views.py | 49 +- .../tests/unit/test_on_demand_feature_view.py | 59 +- .../tests/unit/test_stream_feature_view.py | 91 ++- sdk/python/tests/utils/e2e_test_validation.py | 8 + 16 files changed, 1196 insertions(+), 5 deletions(-) create mode 100644 Created_timestamp_fix.patch diff --git a/Created_timestamp_fix.patch b/Created_timestamp_fix.patch new file mode 100644 index 00000000000..98c9248a066 --- /dev/null +++ b/Created_timestamp_fix.patch @@ -0,0 +1,626 @@ +Subject: [PATCH] Created_timestamp fix +--- +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt b/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt +@@ -529,7 +529,7 @@ + # mypy + mypy-protobuf==3.1 + # via feast (setup.py) +-mysqlclient==2.1.1 ++mysqlclient==2.2.3 + # via feast (setup.py) + nbclassic==1.0.0 + # via notebook +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py +@@ -221,6 +221,12 @@ + + return fs + ++ def update_meta(self, stored_proto): ++ feature_service_proto = self.FromString(stored_proto["feature_service_proto"]) ++ self.created_timestamp = ( ++ feature_service_proto.meta.created_timestamp.ToDatetime() ++ ) ++ + def to_proto(self) -> FeatureServiceProto: + """ + Converts a feature service to its protobuf representation. +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py +@@ -4,6 +4,13 @@ + from feast.infra.offline_stores.file_source import FileSource + from feast.types import Float32 + from tests.utils.test_wrappers import no_warnings ++from feast.protos.feast.core.FeatureService_pb2 import FeatureService as FeatureServiceProto ++from feast.protos.feast.core.FeatureService_pb2 import ( ++ FeatureServiceMeta as FeatureServiceMetaProto, ++) ++from feast.protos.feast.core.FeatureService_pb2 import ( ++ FeatureServiceSpec as FeatureServiceSpecProto, ++) + + + def test_feature_service_with_description(): +@@ -72,3 +79,13 @@ + _ = FeatureService( + name="my-feature-service", features=[feature_view[["feature1", "feature2"]]] + ) ++def test_update_meta(): ++ # Create a feature service with no materialization intervals ++ feature_service = FeatureService(name="test_feature_service", features=[]) ++ feature_service_proto = FeatureServiceProto( ++ spec=FeatureServiceSpecProto(name="test_feature_service", features=[]), ++ meta=FeatureServiceMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp()))) ++ ) ++ stored_proto = {"feature_service_proto": feature_service_proto.SerializeToString()} ++ feature_service.update_meta(stored_proto) ++ assert feature_service.created_timestamp is not None +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py +@@ -169,6 +169,12 @@ + + return entity + ++ def update_meta(self, stored_proto): ++ entity_proto = self.FromString(stored_proto["entity_proto"]) ++ self.created_timestamp = ( ++ entity_proto.meta.created_timestamp.ToDatetime() ++ ) ++ + def to_proto(self) -> EntityProto: + """ + Converts an entity object to its protobuf representation. +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py +@@ -998,6 +998,8 @@ + obj.last_updated_timestamp = update_datetime + + if row: ++ if proto_field_name in ["entity_proto","feature_view_proto","feature_service_proto"]: ++ obj.update_meta(row) + values = { + proto_field_name: obj.to_proto().SerializeToString(), + "last_updated_timestamp": update_time, +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py +@@ -1,17 +1,34 @@ +-from datetime import timedelta ++from datetime import timedelta, datetime + + import pytest ++from google.protobuf.internal.well_known_types import Timestamp + from pydantic import ValidationError + + from feast.aggregation import Aggregation + from feast.batch_feature_view import BatchFeatureView +-from feast.data_format import AvroFormat ++from feast.data_format import AvroFormat, ParquetFormat + from feast.data_source import KafkaSource, PushSource + from feast.entity import Entity + from feast.feature_view import FeatureView + from feast.field import Field + from feast.infra.offline_stores.file_source import FileSource + from feast.protos.feast.types.Value_pb2 import ValueType ++from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto ++from feast.protos.feast.core.FeatureView_pb2 import ( ++ FeatureViewMeta as FeatureViewMetaProto, ++) ++from feast.protos.feast.core.FeatureView_pb2 import ( ++ FeatureViewSpec as FeatureViewSpecProto, ++) ++from feast.protos.feast.core.FeatureView_pb2 import ( ++ MaterializationInterval as MaterializationIntervalProto, ++) ++from feast.protos.feast.core.StreamFeatureView_pb2 import ( ++ StreamFeatureView as StreamFeatureViewProto, ++) ++from feast.protos.feast.core.StreamFeatureView_pb2 import ( ++ StreamFeatureViewSpec as StreamFeatureViewSpecProto, ++) + from feast.stream_feature_view import StreamFeatureView, stream_feature_view + from feast.types import Float32 + +@@ -278,3 +295,94 @@ + def test_field_types(): + with pytest.raises(ValidationError): + Field(name="name", dtype=ValueType.INT32) ++ ++ ++def test_update_meta_with_feature_view(): ++ # Create a feature view with no materialization intervals ++ feature_view = FeatureView(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1), ++ source="test_source") ++ feature_view_proto = FeatureViewProto( ++ spec=FeatureViewSpecProto(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), ++ source="test_source"), ++ meta=FeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[]) ++ ) ++ stored_proto = {"feature_view_proto": feature_view_proto.SerializeToString()} ++ feature_view.update_meta(stored_proto) ++ assert feature_view.created_timestamp is not None ++ assert feature_view.materialization_intervals is None ++ ++ # Simulate feature view getting materialized, i.e: materialization intervals is not empty ++ batch_source = FileSource(path="some path") ++ updated_feature_view = FeatureView(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1), ++ source=batch_source) ++ start_date = datetime.now() - timedelta(days=1) ++ end_date = datetime.now() ++ start_time = Timestamp(seconds=int(start_date).timestamp()) ++ end_time = Timestamp(seconds=int(end_date.timestamp())) ++ updated_feature_view_proto = FeatureViewProto( ++ spec=FeatureViewSpecProto(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), ++ source="test_source"), ++ meta=FeatureViewMetaProto( ++ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[ ++ MaterializationIntervalProto( ++ start_time=start_time, ++ end_time=end_time ++ ) ++ ] ++ ) ++ ) ++ stored_proto = {"feature_view_proto": updated_feature_view_proto.SerializeToString()} ++ updated_feature_view.update_meta(stored_proto) ++ assert feature_view.created_timestamp is not None ++ assert feature_view.last_updated_timestamp is not None ++ assert feature_view.materialization_intervals is not None and len(feature_view.materialization_intervals) == 1 ++ assert feature_view.materialization_intervals[0][0] == start_date ++ assert feature_view.materialization_intervals[0][1] == end_date ++ ++ ++def test_update_meta_with_stream_feature_view(): ++ # Create a stream feature view with no materialization intervals ++ batch_source = FileSource(path="some path") ++ stream_feature_view = StreamFeatureView(name="test_stream_feature_view", entities=["entity1"], ++ ttl=timedelta(days=1), source=batch_source) ++ stream_feature_view_proto = StreamFeatureViewProto( ++ spec=StreamFeatureViewSpecProto(name="test_stream_feature_view", entities=["entity1"], ++ ttl=timedelta(days=1).total_seconds(), source="test_source"), ++ meta=FeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[]) ++ ) ++ stored_proto = {"stream_feature_view_proto": stream_feature_view_proto.SerializeToString()} ++ stream_feature_view.update_meta(stored_proto) ++ assert stream_feature_view.created_timestamp is not None ++ assert stream_feature_view.materialization_intervals is None ++ ++ # Simulate stream feature view getting materialized, i.e: materialization intervals is not empty ++ updated_stream_feature_view = StreamFeatureView(name="test_stream_feature_view", entities=["entity1"], ++ ttl=timedelta(days=1), source="test_source") ++ start_date = datetime.now() - timedelta(days=1) ++ end_date = datetime.now() ++ start_time = Timestamp(seconds=int(start_date).timestamp()) ++ end_time = Timestamp(seconds=int(end_date.timestamp())) ++ updated_stream_feature_view_proto = StreamFeatureViewProto( ++ spec=StreamFeatureViewSpecProto(name="test_stream_feature_view", entities=["entity1"], ++ ttl=timedelta(days=1).total_seconds(), source="test_source"), ++ meta=FeatureViewMetaProto( ++ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[ ++ MaterializationIntervalProto( ++ start_time=start_time, ++ end_time=end_time ++ ) ++ ] ++ ) ++ ) ++ stored_proto = {"stream_feature_view_proto": updated_stream_feature_view_proto.SerializeToString()} ++ updated_stream_feature_view.update_meta(stored_proto) ++ assert updated_stream_feature_view.created_timestamp is not None ++ assert updated_stream_feature_view.last_updated_timestamp is not None ++ assert updated_stream_feature_view.materialization_intervals is not None and len( ++ updated_stream_feature_view.materialization_intervals) == 1 ++ assert updated_stream_feature_view.materialization_intervals[0][0] == start_date ++ assert updated_stream_feature_view.materialization_intervals[0][1] == end_date +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py +@@ -140,6 +140,17 @@ + + return True + ++ def update_meta(self, stored_proto): ++ saved_dataset_proto = self.FromString(stored_proto["saved_dataset_proto"]) ++ self.created_timestamp = ( ++ saved_dataset_proto.meta.created_timestamp.ToDatetime() ++ ) ++ self.min_event_timestamp = ( ++ saved_dataset_proto.meta.min_event_timestamp.ToDatetime() ++ ) ++ self.max_event_timestamp = ( ++ saved_dataset_proto.meta.max_event_timestamp.ToDatetime() ++ ) + @staticmethod + def from_proto(saved_dataset_proto: SavedDatasetProto): + """ +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py +@@ -16,7 +16,13 @@ + + from feast.entity import Entity + from feast.value_type import ValueType +- ++from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto ++from feast.protos.feast.core.Entity_pb2 import ( ++ EntityMeta as EntityMetaProto, ++) ++from feast.protos.feast.core.Entity_pb2 import ( ++ EntitySpecV2 as EntitySpecProto, ++) + + def test_join_key_default(): + entity = Entity(name="my-entity", description="My entity") +@@ -73,3 +79,14 @@ + + s4 = {entity1, entity2, entity3, entity4} + assert len(s4) == 3 ++ ++def test_update_meta_with_entity(): ++ # Create an entity with no materialization intervals ++ entity = Entity(name="test_entity", join_keys=["key"], value_type=ValueType.INT32) ++ entity_proto = EntityProto( ++ spec=EntitySpecProto(name="test_entity", value_type=ValueType.INT32.value, join_key="key"), ++ meta=EntityMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp()))) ++ ) ++ stored_proto = {"entity_proto": entity_proto.SerializeToString()} ++ entity.update_meta(stored_proto) ++ assert entity.created_timestamp is not None +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py +@@ -74,22 +74,22 @@ + udf_string: Optional[str] + + def __init__( +- self, +- *, +- name: str, +- source: DataSource, +- entities: Optional[Union[List[Entity], List[str]]] = None, +- ttl: timedelta = timedelta(days=0), +- tags: Optional[Dict[str, str]] = None, +- online: Optional[bool] = True, +- description: Optional[str] = "", +- owner: Optional[str] = "", +- schema: Optional[List[Field]] = None, +- aggregations: Optional[List[Aggregation]] = None, +- mode: Optional[str] = "spark", +- timestamp_field: Optional[str] = "", +- udf: Optional[FunctionType] = None, +- udf_string: Optional[str] = "", ++ self, ++ *, ++ name: str, ++ source: DataSource, ++ entities: Optional[Union[List[Entity], List[str]]] = None, ++ ttl: timedelta = timedelta(days=0), ++ tags: Optional[Dict[str, str]] = None, ++ online: Optional[bool] = True, ++ description: Optional[str] = "", ++ owner: Optional[str] = "", ++ schema: Optional[List[Field]] = None, ++ aggregations: Optional[List[Aggregation]] = None, ++ mode: Optional[str] = "spark", ++ timestamp_field: Optional[str] = "", ++ udf: Optional[FunctionType] = None, ++ udf_string: Optional[str] = "", + ): + if not flags_helper.is_test(): + warnings.warn( +@@ -99,8 +99,8 @@ + ) + + if ( +- type(source).__name__ not in SUPPORTED_STREAM_SOURCES +- and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE ++ type(source).__name__ not in SUPPORTED_STREAM_SOURCES ++ and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE + ): + raise ValueError( + f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " +@@ -143,11 +143,11 @@ + return False + + if ( +- self.mode != other.mode +- or self.timestamp_field != other.timestamp_field +- or self.udf.__code__.co_code != other.udf.__code__.co_code +- or self.udf_string != other.udf_string +- or self.aggregations != other.aggregations ++ self.mode != other.mode ++ or self.timestamp_field != other.timestamp_field ++ or self.udf.__code__.co_code != other.udf.__code__.co_code ++ or self.udf_string != other.udf_string ++ or self.aggregations != other.aggregations + ): + return False + +@@ -156,6 +156,20 @@ + def __hash__(self) -> int: + return super().__hash__() + ++ def update_meta(self, stored_proto): ++ stream_feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) ++ self.created_timestamp = ( ++ stream_feature_view_proto.meta.created_timestamp.ToDatetime() ++ ) ++ ++ for interval in stream_feature_view_proto.meta.materialization_intervals: ++ self.materialization_intervals.append( ++ ( ++ utils.make_tzaware(interval.start_time.ToDatetime()), ++ utils.make_tzaware(interval.end_time.ToDatetime()), ++ ) ++ ) ++ + def to_proto(self): + meta = self.to_proto_meta() + ttl_duration = self.get_ttl_duration() +@@ -301,18 +315,18 @@ + + + def stream_feature_view( +- *, +- entities: Optional[Union[List[Entity], List[str]]] = None, +- ttl: Optional[timedelta] = None, +- tags: Optional[Dict[str, str]] = None, +- online: Optional[bool] = True, +- description: Optional[str] = "", +- owner: Optional[str] = "", +- schema: Optional[List[Field]] = None, +- source: Optional[DataSource] = None, +- aggregations: Optional[List[Aggregation]] = None, +- mode: Optional[str] = "spark", +- timestamp_field: Optional[str] = "", ++ *, ++ entities: Optional[Union[List[Entity], List[str]]] = None, ++ ttl: Optional[timedelta] = None, ++ tags: Optional[Dict[str, str]] = None, ++ online: Optional[bool] = True, ++ description: Optional[str] = "", ++ owner: Optional[str] = "", ++ schema: Optional[List[Field]] = None, ++ source: Optional[DataSource] = None, ++ aggregations: Optional[List[Aggregation]] = None, ++ mode: Optional[str] = "spark", ++ timestamp_field: Optional[str] = "", + ): + """ + Creates an StreamFeatureView object with the given user function as udf. +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py +@@ -171,6 +171,12 @@ + def __hash__(self): + return super().__hash__() + ++ def update_meta(self, stored_proto): ++ on_demand_feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) ++ self.created_timestamp = ( ++ on_demand_feature_view_proto.meta.created_timestamp.ToDatetime() ++ ) ++ + def to_proto(self) -> OnDemandFeatureViewProto: + """ + Converts an on demand feature view object to its protobuf representation. +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py +@@ -51,13 +51,13 @@ + + @abstractmethod + def __init__( +- self, +- *, +- name: str, +- features: Optional[List[Field]] = None, +- description: str = "", +- tags: Optional[Dict[str, str]] = None, +- owner: str = "", ++ self, ++ *, ++ name: str, ++ features: Optional[List[Field]] = None, ++ description: str = "", ++ tags: Optional[Dict[str, str]] = None, ++ owner: str = "", + ): + """ + Creates a BaseFeatureView object. +@@ -140,12 +140,12 @@ + ) + + if ( +- self.name != other.name +- or sorted(self.features) != sorted(other.features) +- or self.projection != other.projection +- or self.description != other.description +- or self.tags != other.tags +- or self.owner != other.owner ++ self.name != other.name ++ or sorted(self.features) != sorted(other.features) ++ or self.projection != other.projection ++ or self.description != other.description ++ or self.tags != other.tags ++ or self.owner != other.owner + ): + return False + +@@ -229,3 +229,6 @@ + cp.projection = feature_view_projection + + return cp ++ ++ def update_obj_meta(self, row): ++ pass +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py +@@ -11,6 +11,7 @@ + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. ++from datetime import timedelta + + import pandas as pd + +@@ -19,7 +20,16 @@ + from feast.infra.offline_stores.file_source import FileSource + from feast.on_demand_feature_view import OnDemandFeatureView + from feast.types import Float32 +- ++from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureView as OnDemandFeatureViewProto ++from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( ++ OnDemandFeatureViewMeta as OnDemandFeatureViewMetaProto, ++) ++from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( ++ OnDemandFeatureViewSpec as OnDemandFeatureViewSpecProto, ++) ++from feast.protos.feast.core.FeatureView_pb2 import ( ++ MaterializationInterval as MaterializationIntervalProto, ++) + + def udf1(features_df: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() +@@ -105,3 +115,42 @@ + on_demand_feature_view_4, + } + assert len(s4) == 3 ++ ++def test_update_meta(): ++ # Create an on demand feature view with no materialization intervals ++ on_demand_feature_view = OnDemandFeatureView(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1), source="test_source") ++ on_demand_feature_view_proto = OnDemandFeatureViewProto( ++ spec=OnDemandFeatureViewSpecProto(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), source="test_source"), ++ meta=OnDemandFeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[]) ++ ) ++ stored_proto = {"on_demand_feature_view_proto": on_demand_feature_view_proto.SerializeToString()} ++ on_demand_feature_view.update_meta(stored_proto) ++ assert on_demand_feature_view.created_timestamp is not None ++ assert on_demand_feature_view.materialization_intervals is None ++ ++ # Simulate on demand feature view getting materialized, i.e: materialization intervals is not empty ++ updated_on_demand_feature_view = OnDemandFeatureView(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1), source="test_source") ++ start_date = datetime.now() - timedelta(days=1) ++ end_date = datetime.now() ++ start_time = Timestamp(seconds=int(start_date).timestamp()) ++ end_time= Timestamp(seconds=int(end_date.timestamp())) ++ updated_on_demand_feature_view_proto = OnDemandFeatureViewProto( ++ spec=OnDemandFeatureViewSpecProto(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), source="test_source"), ++ meta=OnDemandFeatureViewMetaProto( ++ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), ++ materialization_intervals=[ ++ MaterializationIntervalProto( ++ start_time=start_time, ++ end_time=end_time ++ ) ++ ] ++ ) ++ ) ++ stored_proto = {"on_demand_feature_view_proto": updated_on_demand_feature_view_proto.SerializeToString()} ++ updated_on_demand_feature_view.update_meta(stored_proto) ++ assert updated_on_demand_feature_view.created_timestamp is not None ++ assert updated_on_demand_feature_view.last_updated_timestamp is not None ++ assert updated_on_demand_feature_view.materialization_intervals is not None and len(updated_on_demand_feature_view.materialization_intervals) == 1 ++ assert updated_on_demand_feature_view.materialization_intervals[0][0] == start_date ++ assert updated_on_demand_feature_view.materialization_intervals[0][1] == end_date +Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py +--- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py ++++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py +@@ -332,6 +332,19 @@ + + return cp + ++ def update_meta(self, stored_proto): ++ feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) ++ self.created_timestamp = ( ++ feature_view_proto.meta.created_timestamp.ToDatetime() ++ ) ++ ++ for interval in feature_view_proto.meta.materialization_intervals: ++ self.materialization_intervals.append( ++ ( ++ utils.make_tzaware(interval.start_time.ToDatetime()), ++ utils.make_tzaware(interval.end_time.ToDatetime()), ++ ) ++ ) + def to_proto(self) -> FeatureViewProto: + """ + Converts a feature view object to its protobuf representation. diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 31140e28999..3e141d36972 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -238,3 +238,6 @@ def with_projection(self, feature_view_projection: FeatureViewProjection): cp.projection = feature_view_projection return cp + + def update_meta(self, serialized_proto: bytes): + pass diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index a988c200d7c..521ed0ccf41 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -167,6 +167,10 @@ def from_proto(cls, entity_proto: EntityProto): return entity + def update_meta(self, stored_proto: bytes): + entity_proto = EntityProto.FromString(stored_proto) + self.created_timestamp = entity_proto.meta.created_timestamp.ToDatetime() + def to_proto(self) -> EntityProto: """ Converts an entity object to its protobuf representation. diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 8b8cbac8ea2..4860bd08ccd 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -219,6 +219,12 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto): return fs + def update_meta(self, stored_proto: bytes): + feature_service_proto = FeatureServiceProto.FromString(stored_proto) + self.created_timestamp = ( + feature_service_proto.meta.created_timestamp.ToDatetime() + ) + def to_proto(self) -> FeatureServiceProto: """ Converts a feature service to its protobuf representation. diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index ff41400eace..11b826c6de2 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -311,6 +311,15 @@ def with_join_key_map(self, join_key_map: Dict[str, str]): return cp + def update_meta(self, stored_proto: bytes): + feature_view_proto = FeatureViewProto.FromString(stored_proto) + self.created_timestamp = feature_view_proto.meta.created_timestamp.ToDatetime() + + for interval in feature_view_proto.meta.materialization_intervals: + self.materialization_intervals.append( + (interval.start_time.ToDatetime(), interval.end_time.ToDatetime()) + ) + def to_proto(self) -> FeatureViewProto: """ Converts a feature view object to its protobuf representation. diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 26f9da19e18..7e747889b38 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -687,6 +687,13 @@ def _apply_object( obj.last_updated_timestamp = update_datetime if row: + if proto_field_name in [ + "entity_proto", + "saved_dataset_proto", + "feature_view_proto", + "feature_service_proto", + ]: + obj.update_meta(row._mapping[proto_field_name]) values = { proto_field_name: obj.to_proto().SerializeToString(), "last_updated_timestamp": update_time, diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 839ce4d64ca..afda73710d4 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -201,6 +201,12 @@ def __eq__(self, other): def __hash__(self): return super().__hash__() + def update_meta(self, stored_proto: bytes): + on_demand_feature_view_proto = OnDemandFeatureViewProto.FromString(stored_proto) + self.created_timestamp = ( + on_demand_feature_view_proto.meta.created_timestamp.ToDatetime() + ) + def to_proto(self) -> OnDemandFeatureViewProto: """ Converts an on demand feature view object to its protobuf representation. diff --git a/sdk/python/feast/saved_dataset.py b/sdk/python/feast/saved_dataset.py index 4a3043a8731..aafd5d4db74 100644 --- a/sdk/python/feast/saved_dataset.py +++ b/sdk/python/feast/saved_dataset.py @@ -140,6 +140,16 @@ def __eq__(self, other): return True + def update_meta(self, stored_proto: bytes): + saved_dataset_proto = SavedDatasetProto.FromString(stored_proto) + self.created_timestamp = saved_dataset_proto.meta.created_timestamp.ToDatetime() + self.min_event_timestamp = ( + saved_dataset_proto.meta.min_event_timestamp.ToDatetime() + ) + self.max_event_timestamp = ( + saved_dataset_proto.meta.max_event_timestamp.ToDatetime() + ) + @staticmethod def from_proto(saved_dataset_proto: SavedDatasetProto): """ diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 50e1a221456..47a40165413 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -167,6 +167,20 @@ def __eq__(self, other): def __hash__(self) -> int: return super().__hash__() + def update_meta(self, stored_proto: bytes): + stream_feature_view_proto = StreamFeatureViewProto.FromString(stored_proto) + self.created_timestamp = ( + stream_feature_view_proto.meta.created_timestamp.ToDatetime() + ) + + for interval in stream_feature_view_proto.meta.materialization_intervals: + self.materialization_intervals.append( + ( + interval.start_time.ToDatetime(), + interval.end_time.ToDatetime(), + ) + ) + def to_proto(self): meta = self.to_proto_meta() ttl_duration = self.get_ttl_duration() diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 65d07aca45c..f69c4987f1a 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -27,7 +27,7 @@ from testcontainers.minio import MinioContainer from testcontainers.mysql import MySqlContainer -from feast import FileSource, RequestSource +from feast import FeatureService, FileSource, RequestSource from feast.data_format import AvroFormat, ParquetFormat from feast.data_source import KafkaSource from feast.entity import Entity @@ -308,6 +308,13 @@ def test_apply_entity_success(test_registry): # After the first apply, the created_timestamp should be the same as the last_update_timestamp. assert entity.created_timestamp == entity.last_updated_timestamp + # Update entity + entity.description = "Car driver ID" + test_registry.apply_entity(entity, project) + + # The created_timestamp for the entity should be set to the created_timestamp value stored from the previous apply + assert entity.created_timestamp is not None + test_registry.delete_entity("driver_car_id", project) assert_project_uuid(project, project_uuid, test_registry) entities = test_registry.list_entities(project) @@ -601,11 +608,54 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") return data + def simple_udf(x: int): + return x + 3 + + entity_sfv = Entity(name="sfv_my_entity_1", join_keys=["test_key"]) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + watermark_delay_threshold=timedelta(days=1), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity_sfv], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + project = "project" # Register Feature Views test_registry.apply_feature_view(odfv1, project) test_registry.apply_feature_view(fv1, project) + test_registry.apply_feature_view(sfv, project) # Modify odfv by changing a single feature dtype @on_demand_feature_view( @@ -675,6 +725,105 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and feature_view.entities[0] == "fs1_my_entity_1" ) + # Modify fv1 by changing a single dtype + fv1 = FeatureView( + name="my_feature_view_1", + schema=[ + Field(name="test", dtype=Int64), + Field(name="fs1_my_feature_1", dtype=String), + ], + entities=[entity], + tags={"team": "matchmaking"}, + source=batch_source, + ttl=timedelta(minutes=5), + ) + + # Apply the modified fv1 + test_registry.apply_feature_view(fv1, project) + + # Verify feature view after modification + feature_views = test_registry.list_feature_views(project) + + # List Feature Views + assert ( + len(feature_views) == 1 + and feature_views[0].name == "my_feature_view_1" + and feature_views[0].features[0].name == "fs1_my_feature_1" + and feature_views[0].features[0].dtype == String + and feature_views[0].entities[0] == "fs1_my_entity_1" + ) + + feature_view = test_registry.get_feature_view("my_feature_view_1", project) + assert ( + feature_view.name == "my_feature_view_1" + and feature_view.features[0].name == "fs1_my_feature_1" + and feature_view.features[0].dtype == String + and feature_view.entities[0] == "fs1_my_entity_1" + ) + + # The created_timestamp for the feature view should be set to the created_timestamp value stored from the + # previous apply + + assert feature_view.created_timestamp is not None + + # Modify sfv by changing the dtype + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity_sfv], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=String)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + # Apply the modified sfv + test_registry.apply_feature_view(sfv, project) + + # Verify feature view after modification + feature_views = test_registry.list_stream_feature_views(project) + + # List Feature Views + assert ( + len(feature_views) == 1 + and feature_views[0].name == "test kafka stream feature view" + and feature_views[0].features[0].name == "dummy_field" + and feature_views[0].features[0].dtype == String + and feature_views[0].entities[0] == "sfv_my_entity_1" + ) + + feature_view = test_registry.get_stream_feature_view( + "test kafka stream feature view", project + ) + assert ( + feature_view.name == "test kafka stream feature view" + and feature_view.features[0].name == "dummy_field" + and feature_view.features[0].dtype == String + and feature_view.entities[0] == "sfv_my_entity_1" + ) + + # The created_timestamp for the stream feature view should be set to the created_timestamp value stored from the + # previous apply + assert feature_view.created_timestamp is not None + test_registry.teardown() @@ -824,7 +973,7 @@ def simple_udf(x: int): project = "project" - # Register Feature View + # Register Stream Feature View test_registry.apply_feature_view(sfv, project) stream_feature_views = test_registry.list_stream_feature_views(project) @@ -840,6 +989,100 @@ def simple_udf(x: int): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_apply_feature_service_success(test_registry): + # Create Feature Service + file_source = FileSource(name="my_file_source", path="test.parquet") + feature_view = FeatureView( + name="my_feature_view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] + ) + project = "project" + + # Register Feature Service + test_registry.apply_feature_service(fs, project) + + feature_services = test_registry.list_feature_services(project) + + # List Feature Services + assert len(feature_services) == 1 + assert feature_services[0] == fs + + # Delete Feature Service + test_registry.delete_feature_service("my_feature_service_1", project) + feature_services = test_registry.list_feature_services(project) + assert len(feature_services) == 0 + + test_registry.teardown() + + +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_modify_feature_service_success(test_registry): + # Create Feature Service + file_source = FileSource(name="my_file_source", path="test.parquet") + feature_view = FeatureView( + name="my_feature_view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] + ) + project = "project" + + # Register Feature service + test_registry.apply_feature_service(fs, project) + + feature_services = test_registry.list_feature_services(project) + + # List Feature Services + assert len(feature_services) == 1 + assert feature_services[0] == fs + + test_registry.delete_feature_service("my_feature_service_1", project) + feature_services = test_registry.list_feature_services(project) + assert len(feature_services) == 0 + + # Modify Feature Service by removing a feature + fs = FeatureService( + name="my_feature_service_1", features=[feature_view[["feature1"]]] + ) + + # Apply modified Feature Service + test_registry.apply_feature_service(fs, project) + + feature_services = test_registry.list_feature_services(project) + + # Verify Feature Services + assert len(feature_services) == 1 + assert feature_services[0] == fs + # The created_timestamp for the feature service should be set to the created_timestamp value stored from the + # previous apply + assert feature_services[0].created_timestamp is not None + + test_registry.teardown() + + @pytest.mark.integration def test_commit(): fd, registry_path = mkstemp() diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 78f71231049..969720f7f53 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime, timedelta + import assertpy import pytest @@ -73,3 +75,25 @@ def test_hash(): s4 = {entity1, entity2, entity3, entity4} assert len(s4) == 3 + + +def test_update_meta_with_entity(): + # Create an entity that is already present in the SQL registry + stored_entity = Entity( + name="my-entity", join_keys=["key"], value_type=ValueType.INT32 + ) + current_time = datetime.now() + stored_entity.created_timestamp = current_time - timedelta(days=1) + stored_entity.last_updated_timestamp = current_time - timedelta(days=1) + stored_entity_proto = stored_entity.to_proto() + serialized_proto = stored_entity_proto.SerializeToString() + + # Update the entity i.e. here it's simply the name + updated_entity = Entity( + name="my-entity-1", join_keys=["key"], value_type=ValueType.INT32 + ) + updated_entity.last_updated_timestamp = current_time + + updated_entity.update_meta(serialized_proto) + assert updated_entity.created_timestamp == stored_entity.created_timestamp + assert updated_entity.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_feature_service.py b/sdk/python/tests/unit/test_feature_service.py index 4448d2e8ea2..60fcb2069a7 100644 --- a/sdk/python/tests/unit/test_feature_service.py +++ b/sdk/python/tests/unit/test_feature_service.py @@ -1,3 +1,5 @@ +from datetime import datetime, timedelta + from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.field import Field @@ -72,3 +74,39 @@ def test_feature_view_kw_args_normal(): _ = FeatureService( name="my-feature-service", features=[feature_view[["feature1", "feature2"]]] ) + + +def test_update_meta(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + current_time = datetime.now() + # Create a feature service that is already present in the SQL registry + stored_feature_service = FeatureService( + name="test_feature_service", features=[feature_view[["feature1", "feature2"]]] + ) + stored_feature_service.created_timestamp = current_time - timedelta(days=1) + stored_feature_service.last_updated_timestamp = current_time - timedelta(days=1) + stored_feature_service_proto = stored_feature_service.to_proto() + serialized_proto = stored_feature_service_proto.SerializeToString() + + # Update the Feature Service i.e. here it's simply the name + updated_feature_service = FeatureService( + name="my-feature-service-1", features=[feature_view[["feature1", "feature2"]]] + ) + updated_feature_service.last_updated_timestamp = current_time + + updated_feature_service.update_meta(serialized_proto) + + assert ( + updated_feature_service.created_timestamp + == stored_feature_service.created_timestamp + ) + assert updated_feature_service.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 0220d1a8a95..d0bc9b036c1 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import datetime, timedelta import pytest from typeguard import TypeCheckError @@ -117,3 +117,50 @@ def test_hash(): def test_field_types(): with pytest.raises(TypeCheckError): Field(name="name", dtype=ValueType.INT32) + + +def test_update_meta(): + batch_source = FileSource(path="some path") + # Create a feature view that is already present in the SQL registry + stored_feature_view = FeatureView( + name="my-feature-view", entities=[], ttl=timedelta(days=1), source=batch_source + ) + current_time = datetime.now() + stored_feature_view.created_timestamp = current_time - timedelta(days=1) + stored_feature_view.last_updated_timestamp = current_time + start_date = current_time - timedelta(days=1) + end_date = current_time + stored_feature_view.materialization_intervals.append((start_date, end_date)) + + stored_feature_view_proto = stored_feature_view.to_proto() + serialized_proto = stored_feature_view_proto.SerializeToString() + + # Update the entity i.e. here it's simply the name + updated_feature_view = FeatureView( + name="my-feature-view-1", + entities=[], + ttl=timedelta(days=1), + source=batch_source, + ) + + updated_feature_view.last_updated_timestamp = current_time + updated_feature_view.materialization_intervals = [] + + updated_feature_view.update_meta(serialized_proto) + + assert ( + updated_feature_view.created_timestamp == stored_feature_view.created_timestamp + ) + assert updated_feature_view.last_updated_timestamp == current_time + assert ( + updated_feature_view.materialization_intervals is not None + and len(updated_feature_view.materialization_intervals) == 1 + ) + assert ( + updated_feature_view.materialization_intervals[0][0] + == stored_feature_view.materialization_intervals[0][0] + ) + assert ( + updated_feature_view.materialization_intervals[0][1] + == stored_feature_view.materialization_intervals[0][1] + ) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index d9cc5dee50d..9ff3436ed09 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from datetime import datetime, timedelta from typing import Any, Dict, List import pandas as pd @@ -263,3 +263,60 @@ def test_from_proto_backwards_compatible_udf(): reserialized_proto.feature_transformation.udf_string == on_demand_feature_view.feature_transformation.udf_string ) + + +def test_update_meta(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + stored_on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + description="test", + mode="python", + ) + current_time = datetime.now() + stored_on_demand_feature_view.created_timestamp = current_time - timedelta(days=1) + stored_on_demand_feature_view.last_updated_timestamp = current_time - timedelta( + days=1 + ) + stored_on_demand_feature_view_proto = stored_on_demand_feature_view.to_proto() + serialized_proto = stored_on_demand_feature_view_proto.SerializeToString() + + updated_on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view-1", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + description="test", + mode="python", + ) + updated_on_demand_feature_view.last_updated_timestamp = current_time + + updated_on_demand_feature_view.update_meta(serialized_proto) + + assert ( + updated_on_demand_feature_view.created_timestamp + == stored_on_demand_feature_view.created_timestamp + ) + assert updated_on_demand_feature_view.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index b53f9a593ae..b4dca2fb44a 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -1,5 +1,5 @@ import copy -from datetime import timedelta +from datetime import datetime, timedelta import pytest @@ -250,3 +250,92 @@ def test_stream_feature_view_copy(): aggregations=[], ) assert sfv == copy.copy(sfv) + + +def test_update_meta(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + # Create a stream feature view that is already present in the SQL registry + stored_stream_feature_view = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + current_time = datetime.now() + stored_stream_feature_view.created_timestamp = current_time - timedelta(days=1) + stored_stream_feature_view.last_updated_timestamp = current_time - timedelta(days=1) + + start_date = current_time - timedelta(days=1) + end_date = current_time + stored_stream_feature_view.materialization_intervals.append((start_date, end_date)) + stored_stream_feature_view_proto = stored_stream_feature_view.to_proto() + serialized_proto = stored_stream_feature_view_proto.SerializeToString() + + # # Update the stream feature view i.e. here it's simply the name + updated_stream_feature_view = StreamFeatureView( + name="test kafka stream feature view updated", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + updated_stream_feature_view.last_updated_timestamp = current_time + updated_stream_feature_view.materialization_intervals = [] + + updated_stream_feature_view.update_meta(serialized_proto) + + assert ( + updated_stream_feature_view.created_timestamp + == stored_stream_feature_view.created_timestamp + ) + assert updated_stream_feature_view.last_updated_timestamp == current_time + assert ( + updated_stream_feature_view.materialization_intervals is not None + and len(stored_stream_feature_view.materialization_intervals) == 1 + ) + assert ( + updated_stream_feature_view.materialization_intervals[0][0] + == stored_stream_feature_view.materialization_intervals[0][0] + ) + assert ( + updated_stream_feature_view.materialization_intervals[0][1] + == stored_stream_feature_view.materialization_intervals[0][1] + ) diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 885798db109..1002e472d7c 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -78,6 +78,14 @@ def validate_offline_online_store_consistency( # run materialize_incremental() fs.materialize_incremental(feature_views=[fv.name], end_date=now) + updated_fv = fs.registry.get_feature_view(fv.name, fs.project) + + # Check if created_timestamp and materialization_intervals was updated by the registry + assert ( + updated_fv.created_timestamp is not None + and updated_fv.materialization_intervals is not None + and len(updated_fv.materialization_intervals) > 0 + ) # check result of materialize_incremental() _check_offline_and_online_features( From 15c8720b9c8706d7f208ba3426a96081b2baa84c Mon Sep 17 00:00:00 2001 From: msistla96 Date: Wed, 5 Jun 2024 12:33:33 -0500 Subject: [PATCH 2/6] revert uneeded change Signed-off-by: msistla96 --- Created_timestamp_fix.patch | 626 ------------------------------------ 1 file changed, 626 deletions(-) delete mode 100644 Created_timestamp_fix.patch diff --git a/Created_timestamp_fix.patch b/Created_timestamp_fix.patch deleted file mode 100644 index 98c9248a066..00000000000 --- a/Created_timestamp_fix.patch +++ /dev/null @@ -1,626 +0,0 @@ -Subject: [PATCH] Created_timestamp fix ---- -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt b/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/requirements/py3.8-ci-requirements.txt -@@ -529,7 +529,7 @@ - # mypy - mypy-protobuf==3.1 - # via feast (setup.py) --mysqlclient==2.1.1 -+mysqlclient==2.2.3 - # via feast (setup.py) - nbclassic==1.0.0 - # via notebook -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_service.py -@@ -221,6 +221,12 @@ - - return fs - -+ def update_meta(self, stored_proto): -+ feature_service_proto = self.FromString(stored_proto["feature_service_proto"]) -+ self.created_timestamp = ( -+ feature_service_proto.meta.created_timestamp.ToDatetime() -+ ) -+ - def to_proto(self) -> FeatureServiceProto: - """ - Converts a feature service to its protobuf representation. -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_service.py -@@ -4,6 +4,13 @@ - from feast.infra.offline_stores.file_source import FileSource - from feast.types import Float32 - from tests.utils.test_wrappers import no_warnings -+from feast.protos.feast.core.FeatureService_pb2 import FeatureService as FeatureServiceProto -+from feast.protos.feast.core.FeatureService_pb2 import ( -+ FeatureServiceMeta as FeatureServiceMetaProto, -+) -+from feast.protos.feast.core.FeatureService_pb2 import ( -+ FeatureServiceSpec as FeatureServiceSpecProto, -+) - - - def test_feature_service_with_description(): -@@ -72,3 +79,13 @@ - _ = FeatureService( - name="my-feature-service", features=[feature_view[["feature1", "feature2"]]] - ) -+def test_update_meta(): -+ # Create a feature service with no materialization intervals -+ feature_service = FeatureService(name="test_feature_service", features=[]) -+ feature_service_proto = FeatureServiceProto( -+ spec=FeatureServiceSpecProto(name="test_feature_service", features=[]), -+ meta=FeatureServiceMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp()))) -+ ) -+ stored_proto = {"feature_service_proto": feature_service_proto.SerializeToString()} -+ feature_service.update_meta(stored_proto) -+ assert feature_service.created_timestamp is not None -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/entity.py -@@ -169,6 +169,12 @@ - - return entity - -+ def update_meta(self, stored_proto): -+ entity_proto = self.FromString(stored_proto["entity_proto"]) -+ self.created_timestamp = ( -+ entity_proto.meta.created_timestamp.ToDatetime() -+ ) -+ - def to_proto(self) -> EntityProto: - """ - Converts an entity object to its protobuf representation. -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/infra/registry/sql.py -@@ -998,6 +998,8 @@ - obj.last_updated_timestamp = update_datetime - - if row: -+ if proto_field_name in ["entity_proto","feature_view_proto","feature_service_proto"]: -+ obj.update_meta(row) - values = { - proto_field_name: obj.to_proto().SerializeToString(), - "last_updated_timestamp": update_time, -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_feature_views.py -@@ -1,17 +1,34 @@ --from datetime import timedelta -+from datetime import timedelta, datetime - - import pytest -+from google.protobuf.internal.well_known_types import Timestamp - from pydantic import ValidationError - - from feast.aggregation import Aggregation - from feast.batch_feature_view import BatchFeatureView --from feast.data_format import AvroFormat -+from feast.data_format import AvroFormat, ParquetFormat - from feast.data_source import KafkaSource, PushSource - from feast.entity import Entity - from feast.feature_view import FeatureView - from feast.field import Field - from feast.infra.offline_stores.file_source import FileSource - from feast.protos.feast.types.Value_pb2 import ValueType -+from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto -+from feast.protos.feast.core.FeatureView_pb2 import ( -+ FeatureViewMeta as FeatureViewMetaProto, -+) -+from feast.protos.feast.core.FeatureView_pb2 import ( -+ FeatureViewSpec as FeatureViewSpecProto, -+) -+from feast.protos.feast.core.FeatureView_pb2 import ( -+ MaterializationInterval as MaterializationIntervalProto, -+) -+from feast.protos.feast.core.StreamFeatureView_pb2 import ( -+ StreamFeatureView as StreamFeatureViewProto, -+) -+from feast.protos.feast.core.StreamFeatureView_pb2 import ( -+ StreamFeatureViewSpec as StreamFeatureViewSpecProto, -+) - from feast.stream_feature_view import StreamFeatureView, stream_feature_view - from feast.types import Float32 - -@@ -278,3 +295,94 @@ - def test_field_types(): - with pytest.raises(ValidationError): - Field(name="name", dtype=ValueType.INT32) -+ -+ -+def test_update_meta_with_feature_view(): -+ # Create a feature view with no materialization intervals -+ feature_view = FeatureView(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1), -+ source="test_source") -+ feature_view_proto = FeatureViewProto( -+ spec=FeatureViewSpecProto(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), -+ source="test_source"), -+ meta=FeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[]) -+ ) -+ stored_proto = {"feature_view_proto": feature_view_proto.SerializeToString()} -+ feature_view.update_meta(stored_proto) -+ assert feature_view.created_timestamp is not None -+ assert feature_view.materialization_intervals is None -+ -+ # Simulate feature view getting materialized, i.e: materialization intervals is not empty -+ batch_source = FileSource(path="some path") -+ updated_feature_view = FeatureView(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1), -+ source=batch_source) -+ start_date = datetime.now() - timedelta(days=1) -+ end_date = datetime.now() -+ start_time = Timestamp(seconds=int(start_date).timestamp()) -+ end_time = Timestamp(seconds=int(end_date.timestamp())) -+ updated_feature_view_proto = FeatureViewProto( -+ spec=FeatureViewSpecProto(name="test_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), -+ source="test_source"), -+ meta=FeatureViewMetaProto( -+ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[ -+ MaterializationIntervalProto( -+ start_time=start_time, -+ end_time=end_time -+ ) -+ ] -+ ) -+ ) -+ stored_proto = {"feature_view_proto": updated_feature_view_proto.SerializeToString()} -+ updated_feature_view.update_meta(stored_proto) -+ assert feature_view.created_timestamp is not None -+ assert feature_view.last_updated_timestamp is not None -+ assert feature_view.materialization_intervals is not None and len(feature_view.materialization_intervals) == 1 -+ assert feature_view.materialization_intervals[0][0] == start_date -+ assert feature_view.materialization_intervals[0][1] == end_date -+ -+ -+def test_update_meta_with_stream_feature_view(): -+ # Create a stream feature view with no materialization intervals -+ batch_source = FileSource(path="some path") -+ stream_feature_view = StreamFeatureView(name="test_stream_feature_view", entities=["entity1"], -+ ttl=timedelta(days=1), source=batch_source) -+ stream_feature_view_proto = StreamFeatureViewProto( -+ spec=StreamFeatureViewSpecProto(name="test_stream_feature_view", entities=["entity1"], -+ ttl=timedelta(days=1).total_seconds(), source="test_source"), -+ meta=FeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[]) -+ ) -+ stored_proto = {"stream_feature_view_proto": stream_feature_view_proto.SerializeToString()} -+ stream_feature_view.update_meta(stored_proto) -+ assert stream_feature_view.created_timestamp is not None -+ assert stream_feature_view.materialization_intervals is None -+ -+ # Simulate stream feature view getting materialized, i.e: materialization intervals is not empty -+ updated_stream_feature_view = StreamFeatureView(name="test_stream_feature_view", entities=["entity1"], -+ ttl=timedelta(days=1), source="test_source") -+ start_date = datetime.now() - timedelta(days=1) -+ end_date = datetime.now() -+ start_time = Timestamp(seconds=int(start_date).timestamp()) -+ end_time = Timestamp(seconds=int(end_date.timestamp())) -+ updated_stream_feature_view_proto = StreamFeatureViewProto( -+ spec=StreamFeatureViewSpecProto(name="test_stream_feature_view", entities=["entity1"], -+ ttl=timedelta(days=1).total_seconds(), source="test_source"), -+ meta=FeatureViewMetaProto( -+ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[ -+ MaterializationIntervalProto( -+ start_time=start_time, -+ end_time=end_time -+ ) -+ ] -+ ) -+ ) -+ stored_proto = {"stream_feature_view_proto": updated_stream_feature_view_proto.SerializeToString()} -+ updated_stream_feature_view.update_meta(stored_proto) -+ assert updated_stream_feature_view.created_timestamp is not None -+ assert updated_stream_feature_view.last_updated_timestamp is not None -+ assert updated_stream_feature_view.materialization_intervals is not None and len( -+ updated_stream_feature_view.materialization_intervals) == 1 -+ assert updated_stream_feature_view.materialization_intervals[0][0] == start_date -+ assert updated_stream_feature_view.materialization_intervals[0][1] == end_date -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/saved_dataset.py -@@ -140,6 +140,17 @@ - - return True - -+ def update_meta(self, stored_proto): -+ saved_dataset_proto = self.FromString(stored_proto["saved_dataset_proto"]) -+ self.created_timestamp = ( -+ saved_dataset_proto.meta.created_timestamp.ToDatetime() -+ ) -+ self.min_event_timestamp = ( -+ saved_dataset_proto.meta.min_event_timestamp.ToDatetime() -+ ) -+ self.max_event_timestamp = ( -+ saved_dataset_proto.meta.max_event_timestamp.ToDatetime() -+ ) - @staticmethod - def from_proto(saved_dataset_proto: SavedDatasetProto): - """ -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_entity.py -@@ -16,7 +16,13 @@ - - from feast.entity import Entity - from feast.value_type import ValueType -- -+from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto -+from feast.protos.feast.core.Entity_pb2 import ( -+ EntityMeta as EntityMetaProto, -+) -+from feast.protos.feast.core.Entity_pb2 import ( -+ EntitySpecV2 as EntitySpecProto, -+) - - def test_join_key_default(): - entity = Entity(name="my-entity", description="My entity") -@@ -73,3 +79,14 @@ - - s4 = {entity1, entity2, entity3, entity4} - assert len(s4) == 3 -+ -+def test_update_meta_with_entity(): -+ # Create an entity with no materialization intervals -+ entity = Entity(name="test_entity", join_keys=["key"], value_type=ValueType.INT32) -+ entity_proto = EntityProto( -+ spec=EntitySpecProto(name="test_entity", value_type=ValueType.INT32.value, join_key="key"), -+ meta=EntityMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp()))) -+ ) -+ stored_proto = {"entity_proto": entity_proto.SerializeToString()} -+ entity.update_meta(stored_proto) -+ assert entity.created_timestamp is not None -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/stream_feature_view.py -@@ -74,22 +74,22 @@ - udf_string: Optional[str] - - def __init__( -- self, -- *, -- name: str, -- source: DataSource, -- entities: Optional[Union[List[Entity], List[str]]] = None, -- ttl: timedelta = timedelta(days=0), -- tags: Optional[Dict[str, str]] = None, -- online: Optional[bool] = True, -- description: Optional[str] = "", -- owner: Optional[str] = "", -- schema: Optional[List[Field]] = None, -- aggregations: Optional[List[Aggregation]] = None, -- mode: Optional[str] = "spark", -- timestamp_field: Optional[str] = "", -- udf: Optional[FunctionType] = None, -- udf_string: Optional[str] = "", -+ self, -+ *, -+ name: str, -+ source: DataSource, -+ entities: Optional[Union[List[Entity], List[str]]] = None, -+ ttl: timedelta = timedelta(days=0), -+ tags: Optional[Dict[str, str]] = None, -+ online: Optional[bool] = True, -+ description: Optional[str] = "", -+ owner: Optional[str] = "", -+ schema: Optional[List[Field]] = None, -+ aggregations: Optional[List[Aggregation]] = None, -+ mode: Optional[str] = "spark", -+ timestamp_field: Optional[str] = "", -+ udf: Optional[FunctionType] = None, -+ udf_string: Optional[str] = "", - ): - if not flags_helper.is_test(): - warnings.warn( -@@ -99,8 +99,8 @@ - ) - - if ( -- type(source).__name__ not in SUPPORTED_STREAM_SOURCES -- and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE -+ type(source).__name__ not in SUPPORTED_STREAM_SOURCES -+ and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE - ): - raise ValueError( - f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " -@@ -143,11 +143,11 @@ - return False - - if ( -- self.mode != other.mode -- or self.timestamp_field != other.timestamp_field -- or self.udf.__code__.co_code != other.udf.__code__.co_code -- or self.udf_string != other.udf_string -- or self.aggregations != other.aggregations -+ self.mode != other.mode -+ or self.timestamp_field != other.timestamp_field -+ or self.udf.__code__.co_code != other.udf.__code__.co_code -+ or self.udf_string != other.udf_string -+ or self.aggregations != other.aggregations - ): - return False - -@@ -156,6 +156,20 @@ - def __hash__(self) -> int: - return super().__hash__() - -+ def update_meta(self, stored_proto): -+ stream_feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) -+ self.created_timestamp = ( -+ stream_feature_view_proto.meta.created_timestamp.ToDatetime() -+ ) -+ -+ for interval in stream_feature_view_proto.meta.materialization_intervals: -+ self.materialization_intervals.append( -+ ( -+ utils.make_tzaware(interval.start_time.ToDatetime()), -+ utils.make_tzaware(interval.end_time.ToDatetime()), -+ ) -+ ) -+ - def to_proto(self): - meta = self.to_proto_meta() - ttl_duration = self.get_ttl_duration() -@@ -301,18 +315,18 @@ - - - def stream_feature_view( -- *, -- entities: Optional[Union[List[Entity], List[str]]] = None, -- ttl: Optional[timedelta] = None, -- tags: Optional[Dict[str, str]] = None, -- online: Optional[bool] = True, -- description: Optional[str] = "", -- owner: Optional[str] = "", -- schema: Optional[List[Field]] = None, -- source: Optional[DataSource] = None, -- aggregations: Optional[List[Aggregation]] = None, -- mode: Optional[str] = "spark", -- timestamp_field: Optional[str] = "", -+ *, -+ entities: Optional[Union[List[Entity], List[str]]] = None, -+ ttl: Optional[timedelta] = None, -+ tags: Optional[Dict[str, str]] = None, -+ online: Optional[bool] = True, -+ description: Optional[str] = "", -+ owner: Optional[str] = "", -+ schema: Optional[List[Field]] = None, -+ source: Optional[DataSource] = None, -+ aggregations: Optional[List[Aggregation]] = None, -+ mode: Optional[str] = "spark", -+ timestamp_field: Optional[str] = "", - ): - """ - Creates an StreamFeatureView object with the given user function as udf. -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/on_demand_feature_view.py -@@ -171,6 +171,12 @@ - def __hash__(self): - return super().__hash__() - -+ def update_meta(self, stored_proto): -+ on_demand_feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) -+ self.created_timestamp = ( -+ on_demand_feature_view_proto.meta.created_timestamp.ToDatetime() -+ ) -+ - def to_proto(self) -> OnDemandFeatureViewProto: - """ - Converts an on demand feature view object to its protobuf representation. -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/base_feature_view.py -@@ -51,13 +51,13 @@ - - @abstractmethod - def __init__( -- self, -- *, -- name: str, -- features: Optional[List[Field]] = None, -- description: str = "", -- tags: Optional[Dict[str, str]] = None, -- owner: str = "", -+ self, -+ *, -+ name: str, -+ features: Optional[List[Field]] = None, -+ description: str = "", -+ tags: Optional[Dict[str, str]] = None, -+ owner: str = "", - ): - """ - Creates a BaseFeatureView object. -@@ -140,12 +140,12 @@ - ) - - if ( -- self.name != other.name -- or sorted(self.features) != sorted(other.features) -- or self.projection != other.projection -- or self.description != other.description -- or self.tags != other.tags -- or self.owner != other.owner -+ self.name != other.name -+ or sorted(self.features) != sorted(other.features) -+ or self.projection != other.projection -+ or self.description != other.description -+ or self.tags != other.tags -+ or self.owner != other.owner - ): - return False - -@@ -229,3 +229,6 @@ - cp.projection = feature_view_projection - - return cp -+ -+ def update_obj_meta(self, row): -+ pass -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/tests/unit/test_on_demand_feature_view.py -@@ -11,6 +11,7 @@ - # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - # See the License for the specific language governing permissions and - # limitations under the License. -+from datetime import timedelta - - import pandas as pd - -@@ -19,7 +20,16 @@ - from feast.infra.offline_stores.file_source import FileSource - from feast.on_demand_feature_view import OnDemandFeatureView - from feast.types import Float32 -- -+from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureView as OnDemandFeatureViewProto -+from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( -+ OnDemandFeatureViewMeta as OnDemandFeatureViewMetaProto, -+) -+from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( -+ OnDemandFeatureViewSpec as OnDemandFeatureViewSpecProto, -+) -+from feast.protos.feast.core.FeatureView_pb2 import ( -+ MaterializationInterval as MaterializationIntervalProto, -+) - - def udf1(features_df: pd.DataFrame) -> pd.DataFrame: - df = pd.DataFrame() -@@ -105,3 +115,42 @@ - on_demand_feature_view_4, - } - assert len(s4) == 3 -+ -+def test_update_meta(): -+ # Create an on demand feature view with no materialization intervals -+ on_demand_feature_view = OnDemandFeatureView(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1), source="test_source") -+ on_demand_feature_view_proto = OnDemandFeatureViewProto( -+ spec=OnDemandFeatureViewSpecProto(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), source="test_source"), -+ meta=OnDemandFeatureViewMetaProto(created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[]) -+ ) -+ stored_proto = {"on_demand_feature_view_proto": on_demand_feature_view_proto.SerializeToString()} -+ on_demand_feature_view.update_meta(stored_proto) -+ assert on_demand_feature_view.created_timestamp is not None -+ assert on_demand_feature_view.materialization_intervals is None -+ -+ # Simulate on demand feature view getting materialized, i.e: materialization intervals is not empty -+ updated_on_demand_feature_view = OnDemandFeatureView(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1), source="test_source") -+ start_date = datetime.now() - timedelta(days=1) -+ end_date = datetime.now() -+ start_time = Timestamp(seconds=int(start_date).timestamp()) -+ end_time= Timestamp(seconds=int(end_date.timestamp())) -+ updated_on_demand_feature_view_proto = OnDemandFeatureViewProto( -+ spec=OnDemandFeatureViewSpecProto(name="test_on_demand_feature_view", entities=["entity1"], ttl=timedelta(days=1).total_seconds(), source="test_source"), -+ meta=OnDemandFeatureViewMetaProto( -+ created_timestamp=Timestamp(seconds=int(datetime.now().timestamp())), -+ materialization_intervals=[ -+ MaterializationIntervalProto( -+ start_time=start_time, -+ end_time=end_time -+ ) -+ ] -+ ) -+ ) -+ stored_proto = {"on_demand_feature_view_proto": updated_on_demand_feature_view_proto.SerializeToString()} -+ updated_on_demand_feature_view.update_meta(stored_proto) -+ assert updated_on_demand_feature_view.created_timestamp is not None -+ assert updated_on_demand_feature_view.last_updated_timestamp is not None -+ assert updated_on_demand_feature_view.materialization_intervals is not None and len(updated_on_demand_feature_view.materialization_intervals) == 1 -+ assert updated_on_demand_feature_view.materialization_intervals[0][0] == start_date -+ assert updated_on_demand_feature_view.materialization_intervals[0][1] == end_date -Index: feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py ---- a/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py -+++ b/feature-lifecycle-codebase/Feast/feast/sdk/python/feast/feature_view.py -@@ -332,6 +332,19 @@ - - return cp - -+ def update_meta(self, stored_proto): -+ feature_view_proto = self.FromString(stored_proto["feature_view_proto"]) -+ self.created_timestamp = ( -+ feature_view_proto.meta.created_timestamp.ToDatetime() -+ ) -+ -+ for interval in feature_view_proto.meta.materialization_intervals: -+ self.materialization_intervals.append( -+ ( -+ utils.make_tzaware(interval.start_time.ToDatetime()), -+ utils.make_tzaware(interval.end_time.ToDatetime()), -+ ) -+ ) - def to_proto(self) -> FeatureViewProto: - """ - Converts a feature view object to its protobuf representation. From ca8855369c87f29c02cb6b89a7af4555ea1a91f2 Mon Sep 17 00:00:00 2001 From: msistla96 Date: Sat, 8 Jun 2024 22:42:31 -0500 Subject: [PATCH 3/6] refactor code, tests to include changes for other registries Signed-off-by: msistla96 --- sdk/python/feast/base_feature_view.py | 3 - sdk/python/feast/entity.py | 4 - sdk/python/feast/feature_service.py | 6 -- sdk/python/feast/feature_view.py | 14 ++- .../feast/infra/registry/base_registry.py | 28 ++++++ sdk/python/feast/infra/registry/registry.py | 34 ++++++- sdk/python/feast/infra/registry/sql.py | 11 ++- sdk/python/feast/on_demand_feature_view.py | 6 -- sdk/python/feast/saved_dataset.py | 10 -- sdk/python/feast/stream_feature_view.py | 14 --- .../registration/test_universal_registry.py | 99 ++++++++++++------- .../test_local_feature_store.py | 14 +++ sdk/python/tests/unit/test_entity.py | 23 ----- sdk/python/tests/unit/test_feature_service.py | 38 ------- sdk/python/tests/unit/test_feature_views.py | 57 ++++++----- .../tests/unit/test_on_demand_feature_view.py | 58 ----------- .../tests/unit/test_stream_feature_view.py | 18 +--- sdk/python/tests/utils/e2e_test_validation.py | 5 +- 18 files changed, 189 insertions(+), 253 deletions(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 3e141d36972..31140e28999 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -238,6 +238,3 @@ def with_projection(self, feature_view_projection: FeatureViewProjection): cp.projection = feature_view_projection return cp - - def update_meta(self, serialized_proto: bytes): - pass diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 521ed0ccf41..a988c200d7c 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -167,10 +167,6 @@ def from_proto(cls, entity_proto: EntityProto): return entity - def update_meta(self, stored_proto: bytes): - entity_proto = EntityProto.FromString(stored_proto) - self.created_timestamp = entity_proto.meta.created_timestamp.ToDatetime() - def to_proto(self) -> EntityProto: """ Converts an entity object to its protobuf representation. diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 4860bd08ccd..8b8cbac8ea2 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -219,12 +219,6 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto): return fs - def update_meta(self, stored_proto: bytes): - feature_service_proto = FeatureServiceProto.FromString(stored_proto) - self.created_timestamp = ( - feature_service_proto.meta.created_timestamp.ToDatetime() - ) - def to_proto(self) -> FeatureServiceProto: """ Converts a feature service to its protobuf representation. diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 11b826c6de2..0d1dbf04b7a 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -311,14 +311,12 @@ def with_join_key_map(self, join_key_map: Dict[str, str]): return cp - def update_meta(self, stored_proto: bytes): - feature_view_proto = FeatureViewProto.FromString(stored_proto) - self.created_timestamp = feature_view_proto.meta.created_timestamp.ToDatetime() - - for interval in feature_view_proto.meta.materialization_intervals: - self.materialization_intervals.append( - (interval.start_time.ToDatetime(), interval.end_time.ToDatetime()) - ) + def update_materialization_intervals(self, existing_materialization_intervals): + if existing_materialization_intervals: + for interval in existing_materialization_intervals: + self.materialization_intervals.append( + (interval.start_time.ToDatetime(), interval.end_time.ToDatetime()) + ) def to_proto(self) -> FeatureViewProto: """ diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index b52749a9b2f..55012b97db3 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -29,7 +29,19 @@ from feast.infra.infra_object import Infra from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureService as FeatureServiceProto, +) +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, +) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.transformation.pandas_transformation import PandasTransformation @@ -678,3 +690,19 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: self._message_to_sorted_dict(infra_object.to_proto()) ) return registry_dict + + @staticmethod + def deserialize_registry_values(serialized_proto, feast_obj_type) -> Any: + if feast_obj_type == Entity: + return EntityProto.FromString(serialized_proto) + if feast_obj_type == SavedDataset: + return SavedDatasetProto.FromString(serialized_proto) + if feast_obj_type == FeatureView: + return FeatureViewProto.FromString(serialized_proto) + if feast_obj_type == StreamFeatureView: + return StreamFeatureViewProto.FromString(serialized_proto) + if feast_obj_type == OnDemandFeatureView: + return OnDemandFeatureViewProto.FromString(serialized_proto) + if feast_obj_type == FeatureService: + return FeatureServiceProto.FromString(serialized_proto) + return None diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index df1a419ccf7..e4dd8fa69f7 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -265,9 +265,13 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): existing_entity_proto.spec.name == entity_proto.spec.name and existing_entity_proto.spec.project == project ): + entity.created_timestamp = ( + existing_entity_proto.meta.created_timestamp.ToDatetime() + ) + entity_proto = entity.to_proto() + entity_proto.spec.project = project del self.cached_registry_proto.entities[idx] break - self.cached_registry_proto.entities.append(entity_proto) if commit: self.commit() @@ -338,6 +342,11 @@ def apply_feature_service( == feature_service_proto.spec.name and existing_feature_service_proto.spec.project == project ): + feature_service.created_timestamp = ( + existing_feature_service_proto.meta.created_timestamp.ToDatetime() + ) + feature_service_proto = feature_service.to_proto() + feature_service_proto.spec.project = project del registry.feature_services[idx] registry.feature_services.append(feature_service_proto) if commit: @@ -410,6 +419,18 @@ def apply_feature_view( ): return else: + existing_feature_view = type(feature_view).from_proto( + existing_feature_view_proto + ) + feature_view.created_timestamp = ( + existing_feature_view.created_timestamp.replace(tzinfo=None) + ) + if isinstance(feature_view, (FeatureView, StreamFeatureView)): + feature_view.update_materialization_intervals( + existing_feature_view_proto.meta.materialization_intervals + ) + feature_view_proto = feature_view.to_proto() + feature_view_proto.spec.project = project del existing_feature_views_of_same_type[idx] break @@ -638,6 +659,17 @@ def apply_saved_dataset( existing_saved_dataset_proto.spec.name == saved_dataset_proto.spec.name and existing_saved_dataset_proto.spec.project == project ): + saved_dataset.created_timestamp = ( + existing_saved_dataset_proto.meta.created_timestamp.ToDatetime() + ) + saved_dataset.min_event_timestamp = ( + existing_saved_dataset_proto.meta.min_event_timestamp.ToDatetime() + ) + saved_dataset.max_event_timestamp = ( + existing_saved_dataset_proto.meta.max_event_timestamp.ToDatetime() + ) + saved_dataset_proto = saved_dataset.to_proto() + saved_dataset_proto.spec.project = project del self.cached_registry_proto.saved_datasets[idx] break diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 7e747889b38..8c936381a6e 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -693,7 +693,16 @@ def _apply_object( "feature_view_proto", "feature_service_proto", ]: - obj.update_meta(row._mapping[proto_field_name]) + deserialized_proto = self.deserialize_registry_values( + row._mapping[proto_field_name], type(obj) + ) + obj.created_timestamp = ( + deserialized_proto.meta.created_timestamp.ToDatetime() + ) + if isinstance(obj, (FeatureView, StreamFeatureView)): + obj.update_materialization_intervals( + deserialized_proto.meta.materialization_intervals + ) values = { proto_field_name: obj.to_proto().SerializeToString(), "last_updated_timestamp": update_time, diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index afda73710d4..839ce4d64ca 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -201,12 +201,6 @@ def __eq__(self, other): def __hash__(self): return super().__hash__() - def update_meta(self, stored_proto: bytes): - on_demand_feature_view_proto = OnDemandFeatureViewProto.FromString(stored_proto) - self.created_timestamp = ( - on_demand_feature_view_proto.meta.created_timestamp.ToDatetime() - ) - def to_proto(self) -> OnDemandFeatureViewProto: """ Converts an on demand feature view object to its protobuf representation. diff --git a/sdk/python/feast/saved_dataset.py b/sdk/python/feast/saved_dataset.py index aafd5d4db74..4a3043a8731 100644 --- a/sdk/python/feast/saved_dataset.py +++ b/sdk/python/feast/saved_dataset.py @@ -140,16 +140,6 @@ def __eq__(self, other): return True - def update_meta(self, stored_proto: bytes): - saved_dataset_proto = SavedDatasetProto.FromString(stored_proto) - self.created_timestamp = saved_dataset_proto.meta.created_timestamp.ToDatetime() - self.min_event_timestamp = ( - saved_dataset_proto.meta.min_event_timestamp.ToDatetime() - ) - self.max_event_timestamp = ( - saved_dataset_proto.meta.max_event_timestamp.ToDatetime() - ) - @staticmethod def from_proto(saved_dataset_proto: SavedDatasetProto): """ diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 47a40165413..50e1a221456 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -167,20 +167,6 @@ def __eq__(self, other): def __hash__(self) -> int: return super().__hash__() - def update_meta(self, stored_proto: bytes): - stream_feature_view_proto = StreamFeatureViewProto.FromString(stored_proto) - self.created_timestamp = ( - stream_feature_view_proto.meta.created_timestamp.ToDatetime() - ) - - for interval in stream_feature_view_proto.meta.materialization_intervals: - self.materialization_intervals.append( - ( - interval.start_time.ToDatetime(), - interval.end_time.ToDatetime(), - ) - ) - def to_proto(self): meta = self.to_proto_meta() ttl_duration = self.get_ttl_duration() diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index f69c4987f1a..a1bd233ff13 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -309,11 +309,20 @@ def test_apply_entity_success(test_registry): assert entity.created_timestamp == entity.last_updated_timestamp # Update entity - entity.description = "Car driver ID" - test_registry.apply_entity(entity, project) + updated_entity = Entity( + name="driver_car_id", + description="Car driver Id", + tags={"team": "matchmaking"}, + ) + test_registry.apply_entity(updated_entity, project) + + updated_entity = test_registry.get_entity("driver_car_id", project) # The created_timestamp for the entity should be set to the created_timestamp value stored from the previous apply - assert entity.created_timestamp is not None + assert ( + updated_entity.created_timestamp is not None + and updated_entity.created_timestamp == entity.created_timestamp + ) test_registry.delete_entity("driver_car_id", project) assert_project_uuid(project, project_uuid, test_registry) @@ -671,6 +680,8 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") return data + existing_odfv = test_registry.get_on_demand_feature_view("odfv1", project) + # Apply the modified odfv test_registry.apply_feature_view(odfv1, project) @@ -705,6 +716,11 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and list(request_schema.values())[0] == ValueType.INT32 ) + assert ( + feature_view.created_timestamp is not None + and feature_view.created_timestamp == existing_odfv.created_timestamp + ) + # Make sure fv1 is untouched feature_views = test_registry.list_feature_views(project) @@ -726,7 +742,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) # Modify fv1 by changing a single dtype - fv1 = FeatureView( + updated_fv1 = FeatureView( name="my_feature_view_1", schema=[ Field(name="test", dtype=Int64), @@ -739,32 +755,35 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) # Apply the modified fv1 - test_registry.apply_feature_view(fv1, project) + test_registry.apply_feature_view(updated_fv1, project) # Verify feature view after modification - feature_views = test_registry.list_feature_views(project) + updated_feature_views = test_registry.list_feature_views(project) # List Feature Views assert ( - len(feature_views) == 1 - and feature_views[0].name == "my_feature_view_1" - and feature_views[0].features[0].name == "fs1_my_feature_1" - and feature_views[0].features[0].dtype == String - and feature_views[0].entities[0] == "fs1_my_entity_1" + len(updated_feature_views) == 1 + and updated_feature_views[0].name == "my_feature_view_1" + and updated_feature_views[0].features[0].name == "fs1_my_feature_1" + and updated_feature_views[0].features[0].dtype == String + and updated_feature_views[0].entities[0] == "fs1_my_entity_1" ) - feature_view = test_registry.get_feature_view("my_feature_view_1", project) + updated_feature_view = test_registry.get_feature_view("my_feature_view_1", project) assert ( - feature_view.name == "my_feature_view_1" - and feature_view.features[0].name == "fs1_my_feature_1" - and feature_view.features[0].dtype == String - and feature_view.entities[0] == "fs1_my_entity_1" + updated_feature_view.name == "my_feature_view_1" + and updated_feature_view.features[0].name == "fs1_my_feature_1" + and updated_feature_view.features[0].dtype == String + and updated_feature_view.entities[0] == "fs1_my_entity_1" ) # The created_timestamp for the feature view should be set to the created_timestamp value stored from the # previous apply - assert feature_view.created_timestamp is not None + assert ( + updated_feature_view.created_timestamp is not None + and updated_feature_view.created_timestamp == feature_view.created_timestamp + ) # Modify sfv by changing the dtype @@ -795,34 +814,40 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: tags={}, ) + existing_sfv = test_registry.get_stream_feature_view( + "test kafka stream feature view", project + ) # Apply the modified sfv test_registry.apply_feature_view(sfv, project) # Verify feature view after modification - feature_views = test_registry.list_stream_feature_views(project) + updated_stream_feature_views = test_registry.list_stream_feature_views(project) # List Feature Views assert ( - len(feature_views) == 1 - and feature_views[0].name == "test kafka stream feature view" - and feature_views[0].features[0].name == "dummy_field" - and feature_views[0].features[0].dtype == String - and feature_views[0].entities[0] == "sfv_my_entity_1" + len(updated_stream_feature_views) == 1 + and updated_stream_feature_views[0].name == "test kafka stream feature view" + and updated_stream_feature_views[0].features[0].name == "dummy_field" + and updated_stream_feature_views[0].features[0].dtype == String + and updated_stream_feature_views[0].entities[0] == "sfv_my_entity_1" ) - feature_view = test_registry.get_stream_feature_view( + updated_sfv = test_registry.get_stream_feature_view( "test kafka stream feature view", project ) assert ( - feature_view.name == "test kafka stream feature view" - and feature_view.features[0].name == "dummy_field" - and feature_view.features[0].dtype == String - and feature_view.entities[0] == "sfv_my_entity_1" + updated_sfv.name == "test kafka stream feature view" + and updated_sfv.features[0].name == "dummy_field" + and updated_sfv.features[0].dtype == String + and updated_sfv.entities[0] == "sfv_my_entity_1" ) # The created_timestamp for the stream feature view should be set to the created_timestamp value stored from the # previous apply - assert feature_view.created_timestamp is not None + assert ( + updated_sfv.created_timestamp is not None + and updated_sfv.created_timestamp == existing_sfv.created_timestamp + ) test_registry.teardown() @@ -1059,10 +1084,6 @@ def test_modify_feature_service_success(test_registry): assert len(feature_services) == 1 assert feature_services[0] == fs - test_registry.delete_feature_service("my_feature_service_1", project) - feature_services = test_registry.list_feature_services(project) - assert len(feature_services) == 0 - # Modify Feature Service by removing a feature fs = FeatureService( name="my_feature_service_1", features=[feature_view[["feature1"]]] @@ -1071,14 +1092,18 @@ def test_modify_feature_service_success(test_registry): # Apply modified Feature Service test_registry.apply_feature_service(fs, project) - feature_services = test_registry.list_feature_services(project) + updated_feature_services = test_registry.list_feature_services(project) # Verify Feature Services - assert len(feature_services) == 1 - assert feature_services[0] == fs + assert len(updated_feature_services) == 1 + assert updated_feature_services[0] == fs # The created_timestamp for the feature service should be set to the created_timestamp value stored from the # previous apply - assert feature_services[0].created_timestamp is not None + assert ( + updated_feature_services[0].created_timestamp is not None + and updated_feature_services[0].created_timestamp + == feature_services[0].created_timestamp + ) test_registry.teardown() diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index b3e6762c17d..0ce3fad98de 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -321,6 +321,20 @@ def test_reapply_feature_view(test_feature_store, dataframe_source): # Check Feature View fv_stored = test_feature_store.get_feature_view(fv1.name) + assert len(fv_stored.materialization_intervals) == 1 + + # Change and apply Feature View, this time, only the name + fv2 = FeatureView( + name="my_feature_view_2", + schema=[Field(name="int64_col", dtype=Int64)], + entities=[e], + source=file_source, + ttl=timedelta(minutes=5), + ) + test_feature_store.apply([fv2]) + + # Check Feature View + fv_stored = test_feature_store.get_feature_view(fv2.name) assert len(fv_stored.materialization_intervals) == 0 test_feature_store.teardown() diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 969720f7f53..522467f6bfa 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime, timedelta import assertpy import pytest @@ -75,25 +74,3 @@ def test_hash(): s4 = {entity1, entity2, entity3, entity4} assert len(s4) == 3 - - -def test_update_meta_with_entity(): - # Create an entity that is already present in the SQL registry - stored_entity = Entity( - name="my-entity", join_keys=["key"], value_type=ValueType.INT32 - ) - current_time = datetime.now() - stored_entity.created_timestamp = current_time - timedelta(days=1) - stored_entity.last_updated_timestamp = current_time - timedelta(days=1) - stored_entity_proto = stored_entity.to_proto() - serialized_proto = stored_entity_proto.SerializeToString() - - # Update the entity i.e. here it's simply the name - updated_entity = Entity( - name="my-entity-1", join_keys=["key"], value_type=ValueType.INT32 - ) - updated_entity.last_updated_timestamp = current_time - - updated_entity.update_meta(serialized_proto) - assert updated_entity.created_timestamp == stored_entity.created_timestamp - assert updated_entity.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_feature_service.py b/sdk/python/tests/unit/test_feature_service.py index 60fcb2069a7..4448d2e8ea2 100644 --- a/sdk/python/tests/unit/test_feature_service.py +++ b/sdk/python/tests/unit/test_feature_service.py @@ -1,5 +1,3 @@ -from datetime import datetime, timedelta - from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.field import Field @@ -74,39 +72,3 @@ def test_feature_view_kw_args_normal(): _ = FeatureService( name="my-feature-service", features=[feature_view[["feature1", "feature2"]]] ) - - -def test_update_meta(): - file_source = FileSource(name="my-file-source", path="test.parquet") - feature_view = FeatureView( - name="my-feature-view", - entities=[], - schema=[ - Field(name="feature1", dtype=Float32), - Field(name="feature2", dtype=Float32), - ], - source=file_source, - ) - current_time = datetime.now() - # Create a feature service that is already present in the SQL registry - stored_feature_service = FeatureService( - name="test_feature_service", features=[feature_view[["feature1", "feature2"]]] - ) - stored_feature_service.created_timestamp = current_time - timedelta(days=1) - stored_feature_service.last_updated_timestamp = current_time - timedelta(days=1) - stored_feature_service_proto = stored_feature_service.to_proto() - serialized_proto = stored_feature_service_proto.SerializeToString() - - # Update the Feature Service i.e. here it's simply the name - updated_feature_service = FeatureService( - name="my-feature-service-1", features=[feature_view[["feature1", "feature2"]]] - ) - updated_feature_service.last_updated_timestamp = current_time - - updated_feature_service.update_meta(serialized_proto) - - assert ( - updated_feature_service.created_timestamp - == stored_feature_service.created_timestamp - ) - assert updated_feature_service.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index d0bc9b036c1..da43bbabbaf 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -119,48 +119,51 @@ def test_field_types(): Field(name="name", dtype=ValueType.INT32) -def test_update_meta(): +def test_update_materialization_intervals(): batch_source = FileSource(path="some path") + entity = Entity(name="entity_1", description="Some entity") # Create a feature view that is already present in the SQL registry stored_feature_view = FeatureView( - name="my-feature-view", entities=[], ttl=timedelta(days=1), source=batch_source + name="my-feature-view", + entities=[entity], + ttl=timedelta(days=1), + source=batch_source, + ) + + # Update the Feature View without modifying anything + updated_feature_view = FeatureView( + name="my-feature-view", + entities=[entity], + ttl=timedelta(days=1), + source=batch_source, + ) + updated_feature_view.update_materialization_intervals( + stored_feature_view.to_proto().meta.materialization_intervals ) + assert len(updated_feature_view.materialization_intervals) == 0 + current_time = datetime.now() - stored_feature_view.created_timestamp = current_time - timedelta(days=1) - stored_feature_view.last_updated_timestamp = current_time start_date = current_time - timedelta(days=1) end_date = current_time - stored_feature_view.materialization_intervals.append((start_date, end_date)) - - stored_feature_view_proto = stored_feature_view.to_proto() - serialized_proto = stored_feature_view_proto.SerializeToString() + updated_feature_view.materialization_intervals.append((start_date, end_date)) - # Update the entity i.e. here it's simply the name - updated_feature_view = FeatureView( + # Update the Feature View, i.e. simply update the name + second_updated_feature_view = FeatureView( name="my-feature-view-1", - entities=[], + entities=[entity], ttl=timedelta(days=1), source=batch_source, ) - updated_feature_view.last_updated_timestamp = current_time - updated_feature_view.materialization_intervals = [] - - updated_feature_view.update_meta(serialized_proto) - - assert ( - updated_feature_view.created_timestamp == stored_feature_view.created_timestamp - ) - assert updated_feature_view.last_updated_timestamp == current_time - assert ( - updated_feature_view.materialization_intervals is not None - and len(updated_feature_view.materialization_intervals) == 1 + second_updated_feature_view.update_materialization_intervals( + updated_feature_view.to_proto().meta.materialization_intervals ) + assert len(second_updated_feature_view.materialization_intervals) == 1 assert ( - updated_feature_view.materialization_intervals[0][0] - == stored_feature_view.materialization_intervals[0][0] + second_updated_feature_view.materialization_intervals[0][0] + == second_updated_feature_view.materialization_intervals[0][0] ) assert ( - updated_feature_view.materialization_intervals[0][1] - == stored_feature_view.materialization_intervals[0][1] + second_updated_feature_view.materialization_intervals[0][1] + == second_updated_feature_view.materialization_intervals[0][1] ) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 9ff3436ed09..a593a406af1 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime, timedelta from typing import Any, Dict, List import pandas as pd @@ -263,60 +262,3 @@ def test_from_proto_backwards_compatible_udf(): reserialized_proto.feature_transformation.udf_string == on_demand_feature_view.feature_transformation.udf_string ) - - -def test_update_meta(): - file_source = FileSource(name="my-file-source", path="test.parquet") - feature_view = FeatureView( - name="my-feature-view", - entities=[], - schema=[ - Field(name="feature1", dtype=Float32), - Field(name="feature2", dtype=Float32), - ], - source=file_source, - ) - sources = [feature_view] - stored_on_demand_feature_view = OnDemandFeatureView( - name="my-on-demand-feature-view", - sources=sources, - schema=[ - Field(name="output1", dtype=Float32), - Field(name="output2", dtype=Float32), - ], - feature_transformation=PandasTransformation( - udf=udf1, udf_string="udf1 source code" - ), - description="test", - mode="python", - ) - current_time = datetime.now() - stored_on_demand_feature_view.created_timestamp = current_time - timedelta(days=1) - stored_on_demand_feature_view.last_updated_timestamp = current_time - timedelta( - days=1 - ) - stored_on_demand_feature_view_proto = stored_on_demand_feature_view.to_proto() - serialized_proto = stored_on_demand_feature_view_proto.SerializeToString() - - updated_on_demand_feature_view = OnDemandFeatureView( - name="my-on-demand-feature-view-1", - sources=sources, - schema=[ - Field(name="output1", dtype=Float32), - Field(name="output2", dtype=Float32), - ], - feature_transformation=PandasTransformation( - udf=udf1, udf_string="udf1 source code" - ), - description="test", - mode="python", - ) - updated_on_demand_feature_view.last_updated_timestamp = current_time - - updated_on_demand_feature_view.update_meta(serialized_proto) - - assert ( - updated_on_demand_feature_view.created_timestamp - == stored_on_demand_feature_view.created_timestamp - ) - assert updated_on_demand_feature_view.last_updated_timestamp == current_time diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index b4dca2fb44a..711d6125f34 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -252,7 +252,7 @@ def test_stream_feature_view_copy(): assert sfv == copy.copy(sfv) -def test_update_meta(): +def test_update_materialization_intervals(): entity = Entity(name="driver_entity", join_keys=["test_key"]) stream_source = KafkaSource( name="kafka", @@ -286,14 +286,9 @@ def test_update_meta(): tags={}, ) current_time = datetime.now() - stored_stream_feature_view.created_timestamp = current_time - timedelta(days=1) - stored_stream_feature_view.last_updated_timestamp = current_time - timedelta(days=1) - start_date = current_time - timedelta(days=1) end_date = current_time stored_stream_feature_view.materialization_intervals.append((start_date, end_date)) - stored_stream_feature_view_proto = stored_stream_feature_view.to_proto() - serialized_proto = stored_stream_feature_view_proto.SerializeToString() # # Update the stream feature view i.e. here it's simply the name updated_stream_feature_view = StreamFeatureView( @@ -317,16 +312,11 @@ def test_update_meta(): udf=simple_udf, tags={}, ) - updated_stream_feature_view.last_updated_timestamp = current_time - updated_stream_feature_view.materialization_intervals = [] - - updated_stream_feature_view.update_meta(serialized_proto) - assert ( - updated_stream_feature_view.created_timestamp - == stored_stream_feature_view.created_timestamp + updated_stream_feature_view.update_materialization_intervals( + stored_stream_feature_view.to_proto().meta.materialization_intervals ) - assert updated_stream_feature_view.last_updated_timestamp == current_time + assert ( updated_stream_feature_view.materialization_intervals is not None and len(stored_stream_feature_view.materialization_intervals) == 1 diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 1002e472d7c..2149a7727af 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -80,10 +80,9 @@ def validate_offline_online_store_consistency( fs.materialize_incremental(feature_views=[fv.name], end_date=now) updated_fv = fs.registry.get_feature_view(fv.name, fs.project) - # Check if created_timestamp and materialization_intervals was updated by the registry + # Check if materialization_intervals was updated by the registry assert ( - updated_fv.created_timestamp is not None - and updated_fv.materialization_intervals is not None + updated_fv.materialization_intervals is not None and len(updated_fv.materialization_intervals) > 0 ) From 770d92a54821002af416e1acfe8322cb44543b0f Mon Sep 17 00:00:00 2001 From: msistla96 Date: Tue, 11 Jun 2024 20:18:02 -0500 Subject: [PATCH 4/6] update materialization tests/fix a bug Signed-off-by: msistla96 --- sdk/python/feast/feature_view.py | 10 +++---- sdk/python/feast/infra/registry/registry.py | 4 +-- sdk/python/feast/infra/registry/remote.py | 1 + sdk/python/feast/infra/registry/sql.py | 4 ++- sdk/python/feast/registry_server.py | 5 ++-- .../registration/test_universal_registry.py | 29 +++++++++++++++++-- sdk/python/tests/unit/test_entity.py | 1 - sdk/python/tests/unit/test_feature_views.py | 15 +++++----- .../tests/unit/test_on_demand_feature_view.py | 1 + .../tests/unit/test_stream_feature_view.py | 11 +++---- sdk/python/tests/utils/e2e_test_validation.py | 28 ++++++++++++++++-- 11 files changed, 81 insertions(+), 28 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 0d1dbf04b7a..680155ee483 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -311,12 +311,12 @@ def with_join_key_map(self, join_key_map: Dict[str, str]): return cp - def update_materialization_intervals(self, existing_materialization_intervals): - if existing_materialization_intervals: + def update_materialization_intervals( + self, existing_materialization_intervals: List[Tuple[datetime, datetime]] + ): + if len(existing_materialization_intervals) > 0: for interval in existing_materialization_intervals: - self.materialization_intervals.append( - (interval.start_time.ToDatetime(), interval.end_time.ToDatetime()) - ) + self.materialization_intervals.append((interval[0], interval[1])) def to_proto(self) -> FeatureViewProto: """ diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index e4dd8fa69f7..d4fa4e5330b 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -423,11 +423,11 @@ def apply_feature_view( existing_feature_view_proto ) feature_view.created_timestamp = ( - existing_feature_view.created_timestamp.replace(tzinfo=None) + existing_feature_view.created_timestamp ) if isinstance(feature_view, (FeatureView, StreamFeatureView)): feature_view.update_materialization_intervals( - existing_feature_view_proto.meta.materialization_intervals + existing_feature_view.materialization_intervals ) feature_view_proto = feature_view.to_proto() feature_view_proto.spec.project = project diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 4336db232fb..37da649c1c9 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -276,6 +276,7 @@ def apply_materialization( start_date_timestamp.FromDatetime(start_date) end_date_timestamp.FromDatetime(end_date) + # TODO: for this to work for stream feature views, ApplyMaterializationRequest needs to be updated request = RegistryServer_pb2.ApplyMaterializationRequest( feature_view=feature_view.to_proto(), project=project, diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 8c936381a6e..3ead3a2ec57 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -701,7 +701,9 @@ def _apply_object( ) if isinstance(obj, (FeatureView, StreamFeatureView)): obj.update_materialization_intervals( - deserialized_proto.meta.materialization_intervals + type(obj) + .from_proto(deserialized_proto) + .materialization_intervals ) values = { proto_field_name: obj.to_proto().SerializeToString(), diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 85038ad6ff3..444ae5797bb 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -3,6 +3,7 @@ import grpc from google.protobuf.empty_pb2 import Empty +from pytz import utc from feast import FeatureStore from feast.data_source import DataSource @@ -293,10 +294,10 @@ def ApplyMaterialization( feature_view=FeatureView.from_proto(request.feature_view), project=request.project, start_date=datetime.fromtimestamp( - request.start_date.seconds + request.start_date.nanos / 1e9 + request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc ), end_date=datetime.fromtimestamp( - request.end_date.seconds + request.end_date.nanos / 1e9 + request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc ), commit=request.commit, ) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index a1bd233ff13..17bbdcf39cf 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -14,7 +14,7 @@ import logging import os import time -from datetime import timedelta +from datetime import datetime, timedelta from tempfile import mkstemp from unittest import mock @@ -22,6 +22,7 @@ import pandas as pd import pytest from pytest_lazyfixture import lazy_fixture +from pytz import utc from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.minio import MinioContainer @@ -783,6 +784,28 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: assert ( updated_feature_view.created_timestamp is not None and updated_feature_view.created_timestamp == feature_view.created_timestamp + and len(updated_feature_view.materialization_intervals) == 0 + ) + + # Simulate materialization + current_date = datetime.utcnow() + end_date = current_date.replace(tzinfo=utc) + start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) + test_registry.apply_materialization( + updated_feature_view, project, start_date, end_date + ) + materialized_feature_view = test_registry.get_feature_view( + "my_feature_view_1", project + ) + + # Check if materialization_intervals was updated by the registry + assert ( + materialized_feature_view.created_timestamp is not None + and materialized_feature_view.created_timestamp + == feature_view.created_timestamp + and len(materialized_feature_view.materialization_intervals) > 0 + and materialized_feature_view.materialization_intervals[0][0] == start_date + and materialized_feature_view.materialization_intervals[0][1] == end_date ) # Modify sfv by changing the dtype @@ -844,13 +867,13 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: # The created_timestamp for the stream feature view should be set to the created_timestamp value stored from the # previous apply + # Materialization_intervals is not set assert ( updated_sfv.created_timestamp is not None and updated_sfv.created_timestamp == existing_sfv.created_timestamp + and len(updated_sfv.materialization_intervals) == 0 ) - test_registry.teardown() - @pytest.mark.integration @pytest.mark.parametrize( diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 522467f6bfa..78f71231049 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import assertpy import pytest diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index da43bbabbaf..b387f55d8b0 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -3,6 +3,7 @@ import pytest from typeguard import TypeCheckError +from feast import utils from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat from feast.data_source import KafkaSource @@ -138,13 +139,13 @@ def test_update_materialization_intervals(): source=batch_source, ) updated_feature_view.update_materialization_intervals( - stored_feature_view.to_proto().meta.materialization_intervals + stored_feature_view.materialization_intervals ) assert len(updated_feature_view.materialization_intervals) == 0 - current_time = datetime.now() - start_date = current_time - timedelta(days=1) - end_date = current_time + current_time = datetime.utcnow() + start_date = utils.make_tzaware(current_time - timedelta(days=1)) + end_date = utils.make_tzaware(current_time) updated_feature_view.materialization_intervals.append((start_date, end_date)) # Update the Feature View, i.e. simply update the name @@ -156,14 +157,14 @@ def test_update_materialization_intervals(): ) second_updated_feature_view.update_materialization_intervals( - updated_feature_view.to_proto().meta.materialization_intervals + updated_feature_view.materialization_intervals ) assert len(second_updated_feature_view.materialization_intervals) == 1 assert ( second_updated_feature_view.materialization_intervals[0][0] - == second_updated_feature_view.materialization_intervals[0][0] + == updated_feature_view.materialization_intervals[0][0] ) assert ( second_updated_feature_view.materialization_intervals[0][1] - == second_updated_feature_view.materialization_intervals[0][1] + == updated_feature_view.materialization_intervals[0][1] ) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index a593a406af1..d9cc5dee50d 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import Any, Dict, List import pandas as pd diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index 711d6125f34..77431666c30 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -3,6 +3,7 @@ import pytest +from feast import utils from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat @@ -285,12 +286,12 @@ def test_update_materialization_intervals(): udf=simple_udf, tags={}, ) - current_time = datetime.now() - start_date = current_time - timedelta(days=1) - end_date = current_time + current_time = datetime.utcnow() + start_date = utils.make_tzaware(current_time - timedelta(days=1)) + end_date = utils.make_tzaware(current_time) stored_stream_feature_view.materialization_intervals.append((start_date, end_date)) - # # Update the stream feature view i.e. here it's simply the name + # Update the stream feature view i.e. here it's simply the name updated_stream_feature_view = StreamFeatureView( name="test kafka stream feature view updated", entities=[entity], @@ -314,7 +315,7 @@ def test_update_materialization_intervals(): ) updated_stream_feature_view.update_materialization_intervals( - stored_stream_feature_view.to_proto().meta.materialization_intervals + stored_stream_feature_view.materialization_intervals ) assert ( diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 2149a7727af..cbd95e9bbf3 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -80,10 +80,34 @@ def validate_offline_online_store_consistency( fs.materialize_incremental(feature_views=[fv.name], end_date=now) updated_fv = fs.registry.get_feature_view(fv.name, fs.project) + print( + "Materialized first interval equal", + updated_fv.materialization_intervals[0][0], + start_date, + ) + print( + "Materialized first interval equal", + updated_fv.materialization_intervals[0][1], + end_date, + ) + print( + "Materialized second interval equal", + updated_fv.materialization_intervals[1][0], + end_date, + ) + print( + "Materialized second interval equal", + updated_fv.materialization_intervals[1][1], + now, + ) + # Check if materialization_intervals was updated by the registry assert ( - updated_fv.materialization_intervals is not None - and len(updated_fv.materialization_intervals) > 0 + len(updated_fv.materialization_intervals) > 0 + and updated_fv.materialization_intervals[0][0] == start_date + and updated_fv.materialization_intervals[0][1] == end_date + and updated_fv.materialization_intervals[1][0] == end_date + and updated_fv.materialization_intervals[1][1] == now.replace(tzinfo=utc) ) # check result of materialize_incremental() From 65a7e2a432a7bd91e7f7a47220b202ba6e24d2d6 Mon Sep 17 00:00:00 2001 From: msistla96 Date: Tue, 11 Jun 2024 23:47:28 -0500 Subject: [PATCH 5/6] Add materialization test before and after an apply Signed-off-by: msistla96 --- .../registration/test_universal_registry.py | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 17bbdcf39cf..181067d495c 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -742,6 +742,27 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and feature_view.entities[0] == "fs1_my_entity_1" ) + # Simulate materialization + current_date = datetime.utcnow() + end_date = current_date.replace(tzinfo=utc) + start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) + test_registry.apply_materialization(feature_view, project, start_date, end_date) + materialized_feature_view = test_registry.get_feature_view( + "my_feature_view_1", project + ) + + # Check if: + # 1. Materialization_intervals was updated by the registry + # 2. created_timestamp is set to the created_timestamp value stored from the previous apply + assert ( + materialized_feature_view.created_timestamp is not None + and materialized_feature_view.created_timestamp + == feature_view.created_timestamp + and len(materialized_feature_view.materialization_intervals) > 0 + and materialized_feature_view.materialization_intervals[0][0] == start_date + and materialized_feature_view.materialization_intervals[0][1] == end_date + ) + # Modify fv1 by changing a single dtype updated_fv1 = FeatureView( name="my_feature_view_1", @@ -778,34 +799,13 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and updated_feature_view.entities[0] == "fs1_my_entity_1" ) - # The created_timestamp for the feature view should be set to the created_timestamp value stored from the - # previous apply - + # Check if materialization_intervals and created_timestamp values propagates on each apply assert ( updated_feature_view.created_timestamp is not None and updated_feature_view.created_timestamp == feature_view.created_timestamp - and len(updated_feature_view.materialization_intervals) == 0 - ) - - # Simulate materialization - current_date = datetime.utcnow() - end_date = current_date.replace(tzinfo=utc) - start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) - test_registry.apply_materialization( - updated_feature_view, project, start_date, end_date - ) - materialized_feature_view = test_registry.get_feature_view( - "my_feature_view_1", project - ) - - # Check if materialization_intervals was updated by the registry - assert ( - materialized_feature_view.created_timestamp is not None - and materialized_feature_view.created_timestamp - == feature_view.created_timestamp - and len(materialized_feature_view.materialization_intervals) > 0 - and materialized_feature_view.materialization_intervals[0][0] == start_date - and materialized_feature_view.materialization_intervals[0][1] == end_date + and len(updated_feature_view.materialization_intervals) == 1 + and updated_feature_view.materialization_intervals[0][0] == start_date + and updated_feature_view.materialization_intervals[0][1] == end_date ) # Modify sfv by changing the dtype From 2d4ae0bd77130cd50e4e61d93d5c30475a004147 Mon Sep 17 00:00:00 2001 From: msistla96 Date: Wed, 12 Jun 2024 00:36:20 -0500 Subject: [PATCH 6/6] fix code/add tests to ensure materialization intervals updates when empty only Signed-off-by: msistla96 --- sdk/python/feast/feature_view.py | 5 ++- .../registration/test_universal_registry.py | 31 +++++++++++++++++-- sdk/python/tests/utils/e2e_test_validation.py | 23 +------------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 680155ee483..1a85a4b90c0 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -314,7 +314,10 @@ def with_join_key_map(self, join_key_map: Dict[str, str]): def update_materialization_intervals( self, existing_materialization_intervals: List[Tuple[datetime, datetime]] ): - if len(existing_materialization_intervals) > 0: + if ( + len(existing_materialization_intervals) > 0 + and len(self.materialization_intervals) == 0 + ): for interval in existing_materialization_intervals: self.materialization_intervals.append((interval[0], interval[1])) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 181067d495c..5469f3857aa 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -751,9 +751,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: "my_feature_view_1", project ) - # Check if: - # 1. Materialization_intervals was updated by the registry - # 2. created_timestamp is set to the created_timestamp value stored from the previous apply + # Check if created_timestamp, along with materialized_intervals are updated assert ( materialized_feature_view.created_timestamp is not None and materialized_feature_view.created_timestamp @@ -776,6 +774,10 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ttl=timedelta(minutes=5), ) + # Check that these fields are empty before apply + assert updated_fv1.created_timestamp is None + assert len(updated_fv1.materialization_intervals) == 0 + # Apply the modified fv1 test_registry.apply_feature_view(updated_fv1, project) @@ -800,6 +802,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) # Check if materialization_intervals and created_timestamp values propagates on each apply + # materialization_intervals will populate only when it's empty assert ( updated_feature_view.created_timestamp is not None and updated_feature_view.created_timestamp == feature_view.created_timestamp @@ -808,6 +811,28 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: and updated_feature_view.materialization_intervals[0][1] == end_date ) + # Simulate materialization a second time + current_date = datetime.utcnow() + end_date_1 = current_date.replace(tzinfo=utc) + start_date_1 = (current_date - timedelta(days=1)).replace(tzinfo=utc) + test_registry.apply_materialization( + updated_feature_view, project, start_date_1, end_date_1 + ) + materialized_feature_view_1 = test_registry.get_feature_view( + "my_feature_view_1", project + ) + + assert ( + materialized_feature_view_1.created_timestamp is not None + and materialized_feature_view_1.created_timestamp + == feature_view.created_timestamp + and len(materialized_feature_view_1.materialization_intervals) == 2 + and materialized_feature_view_1.materialization_intervals[0][0] == start_date + and materialized_feature_view_1.materialization_intervals[0][1] == end_date + and materialized_feature_view_1.materialization_intervals[1][0] == start_date_1 + and materialized_feature_view_1.materialization_intervals[1][1] == end_date_1 + ) + # Modify sfv by changing the dtype sfv = StreamFeatureView( diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index cbd95e9bbf3..d9104bae420 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -80,30 +80,9 @@ def validate_offline_online_store_consistency( fs.materialize_incremental(feature_views=[fv.name], end_date=now) updated_fv = fs.registry.get_feature_view(fv.name, fs.project) - print( - "Materialized first interval equal", - updated_fv.materialization_intervals[0][0], - start_date, - ) - print( - "Materialized first interval equal", - updated_fv.materialization_intervals[0][1], - end_date, - ) - print( - "Materialized second interval equal", - updated_fv.materialization_intervals[1][0], - end_date, - ) - print( - "Materialized second interval equal", - updated_fv.materialization_intervals[1][1], - now, - ) - # Check if materialization_intervals was updated by the registry assert ( - len(updated_fv.materialization_intervals) > 0 + len(updated_fv.materialization_intervals) == 2 and updated_fv.materialization_intervals[0][0] == start_date and updated_fv.materialization_intervals[0][1] == end_date and updated_fv.materialization_intervals[1][0] == end_date