diff --git a/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml b/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml index 40483cc0c43..a66a41e8466 100644 --- a/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml +++ b/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml @@ -3,4 +3,4 @@ kind: Secret metadata: name: openlineage-secret stringData: - api_key: your-marquez-api-key + api_key: your-marquez-api-key # pragma: allowlist secret diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index f3437b08f30..e26595ad4af 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -764,7 +764,7 @@ def apply_feature_view( self._prepare_registry_for_changes(project) assert self.cached_registry_proto - self._check_conflicting_feature_view_names(feature_view) + self._check_conflicting_feature_view_names(feature_view, project) existing_feature_views_of_same_type: RepeatedCompositeFieldContainer if isinstance(feature_view, StreamFeatureView): existing_feature_views_of_same_type = ( @@ -1360,23 +1360,32 @@ def _get_registry_proto( return registry_proto - def _check_conflicting_feature_view_names(self, feature_view: BaseFeatureView): - name_to_fv_protos = self._existing_feature_view_names_to_fvs() + def _check_conflicting_feature_view_names( + self, feature_view: BaseFeatureView, project: str + ): + name_to_fv_protos = self._existing_feature_view_names_to_fvs(project) if feature_view.name in name_to_fv_protos: if not isinstance( name_to_fv_protos.get(feature_view.name), feature_view.proto_class ): raise ConflictingFeatureViewNames(feature_view.name) - def _existing_feature_view_names_to_fvs(self) -> Dict[str, Message]: + def _existing_feature_view_names_to_fvs(self, project: str) -> Dict[str, Message]: assert self.cached_registry_proto odfvs = { fv.spec.name: fv for fv in self.cached_registry_proto.on_demand_feature_views + if fv.spec.project == project + } + fvs = { + fv.spec.name: fv + for fv in self.cached_registry_proto.feature_views + if fv.spec.project == project } - fvs = {fv.spec.name: fv for fv in self.cached_registry_proto.feature_views} sfv = { - fv.spec.name: fv for fv in self.cached_registry_proto.stream_feature_views + fv.spec.name: fv + for fv in self.cached_registry_proto.stream_feature_views + if fv.spec.project == project } return {**odfvs, **fvs, **sfv} diff --git a/sdk/python/tests/unit/infra/registry/test_file_registry.py b/sdk/python/tests/unit/infra/registry/test_file_registry.py new file mode 100644 index 00000000000..07b443e7eaa --- /dev/null +++ b/sdk/python/tests/unit/infra/registry/test_file_registry.py @@ -0,0 +1,95 @@ +import tempfile +from datetime import timedelta + +import pytest + +from feast import Field +from feast.data_source import PushSource +from feast.entity import Entity +from feast.errors import ConflictingFeatureViewNames +from feast.feature_view import FeatureView +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.registry.registry import Registry +from feast.repo_config import RegistryConfig +from feast.stream_feature_view import StreamFeatureView +from feast.types import Float32 +from feast.value_type import ValueType + + +@pytest.fixture +def file_registry(): + fd, registry_path = tempfile.mkstemp() + config = RegistryConfig(path=registry_path) + registry = Registry("test_project", config, None) + yield registry + registry.teardown() + + +def _make_sources(): + file_source = FileSource( + path="driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + push_source = PushSource(name="driver_push", batch_source=file_source) + return file_source, push_source + + +def test_same_project_name_conflict_batch_vs_stream(file_registry): + """A FeatureView and StreamFeatureView with the same name in the same project must raise ConflictingFeatureViewNames.""" + entity = Entity(name="driver", value_type=ValueType.STRING, join_keys=["driver_id"]) + file_registry.apply_entity(entity, "test_project") + + file_source, push_source = _make_sources() + + batch_view = FeatureView( + name="driver_activity", + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="conv_rate", dtype=Float32)], + source=file_source, + ) + file_registry.apply_feature_view(batch_view, "test_project") + + stream_view = StreamFeatureView( + name="driver_activity", + source=push_source, + entities=[entity], + schema=[Field(name="conv_rate", dtype=Float32)], + timestamp_field="event_timestamp", + ) + with pytest.raises(ConflictingFeatureViewNames): + file_registry.apply_feature_view(stream_view, "test_project") + + +def test_cross_project_name_does_not_conflict_batch_vs_stream(file_registry): + """A FeatureView in project_a and a StreamFeatureView with the same name in project_b + must not raise ConflictingFeatureViewNames. + + Before the fix, _existing_feature_view_names_to_fvs scanned all projects, + so the type mismatch between the two projects triggered a spurious error. + """ + entity = Entity(name="driver", value_type=ValueType.STRING, join_keys=["driver_id"]) + file_registry.apply_entity(entity, "project_a") + file_registry.apply_entity(entity, "project_b") + + file_source, push_source = _make_sources() + + batch_view = FeatureView( + name="driver_activity", + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="conv_rate", dtype=Float32)], + source=file_source, + ) + file_registry.apply_feature_view(batch_view, "project_a") + + stream_view = StreamFeatureView( + name="driver_activity", + source=push_source, + entities=[entity], + schema=[Field(name="conv_rate", dtype=Float32)], + timestamp_field="event_timestamp", + ) + # Must not raise — same name, different project, different type. + file_registry.apply_feature_view(stream_view, "project_b")