From 263c52e9a62c6d414e108fe21cc085870c9040c8 Mon Sep 17 00:00:00 2001 From: ntkathole Date: Thu, 21 May 2026 14:35:49 +0530 Subject: [PATCH 1/2] fix: Fix shared SQL registry crash - avoid unnecessary UDF deserialization in proto cache building Signed-off-by: ntkathole --- sdk/python/feast/feature_view.py | 13 +- .../feast/infra/registry/base_registry.py | 8 + .../feast/infra/registry/caching_registry.py | 28 ++-- .../infra/registry/proto_registry_utils.py | 44 ++++-- sdk/python/feast/infra/registry/registry.py | 14 +- sdk/python/feast/infra/registry/remote.py | 4 + sdk/python/feast/infra/registry/snowflake.py | 148 ++++++++++++++---- sdk/python/feast/infra/registry/sql.py | 102 ++++++++---- sdk/python/feast/on_demand_feature_view.py | 15 +- sdk/python/feast/registry_server.py | 5 + sdk/python/feast/stream_feature_view.py | 16 +- .../unit/infra/registry/test_sql_registry.py | 117 +++++++++++++- 12 files changed, 421 insertions(+), 93 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index da11a7958ca..149523eff66 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -113,6 +113,7 @@ class FeatureView(BaseFeatureView): materialization_intervals: List[Tuple[datetime, datetime]] mode: Optional[Union["TransformationMode", str]] enable_validation: bool + _raw_feature_transformation_proto: Optional[Message] = None def __init__( self, @@ -481,7 +482,9 @@ def to_proto_spec( ] feature_transformation_proto = None - if hasattr(self, "feature_transformation") and self.feature_transformation: + if getattr(self, "_raw_feature_transformation_proto", None) is not None: + feature_transformation_proto = self._raw_feature_transformation_proto + elif hasattr(self, "feature_transformation") and self.feature_transformation: feature_transformation_proto = transformation_to_proto( self.feature_transformation ) @@ -636,8 +639,14 @@ def _from_proto_internal( source=source_views if source_views else batch_source, # type: ignore[arg-type] sink_source=batch_source if source_views else None, mode=mode, - feature_transformation=transformation, + feature_transformation=transformation + if not skip_udf + else feature_transformation_proto, # type: ignore[arg-type] ) + if skip_udf: + feature_view._raw_feature_transformation_proto = ( + feature_transformation_proto + ) else: mode_from_spec = ( feature_view_proto.spec.mode if feature_view_proto.spec.mode else None diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index da4f291bc44..f0478312079 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -368,6 +368,7 @@ def list_stream_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[StreamFeatureView]: """ Retrieve a list of stream feature views from the registry @@ -376,6 +377,7 @@ def list_stream_feature_views( project: Filter stream feature views based on project name allow_cache: Whether to allow returning stream feature views from a cached registry tags: Filter by tags + skip_udf: Skip deserializing UDFs (for metadata-only operations) Returns: List of stream feature views @@ -407,6 +409,7 @@ def list_on_demand_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: """ Retrieve a list of on demand feature views from the registry @@ -415,6 +418,7 @@ def list_on_demand_feature_views( project: Filter on demand feature views based on project name allow_cache: Whether to allow returning on demand feature views from a cached registry tags: Filter by tags + skip_udf: Skip deserializing UDFs (for metadata-only operations) Returns: List of on demand feature views @@ -446,6 +450,7 @@ def list_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[FeatureView]: """ Retrieve a list of feature views from the registry @@ -454,6 +459,7 @@ def list_feature_views( allow_cache: Allow returning feature views from the cached registry project: Filter feature views based on project name tags: Filter by tags + skip_udf: Skip deserializing UDFs (for metadata-only operations) Returns: List of feature views @@ -484,6 +490,7 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: """ Retrieve a list of feature views of all types from the registry @@ -492,6 +499,7 @@ def list_all_feature_views( allow_cache: Allow returning feature views from the cached registry project: Filter feature views based on project name tags: Filter by tags + skip_udf: Skip deserializing UDFs (for metadata-only operations) Returns: List of feature views diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index ad6714d9796..2e15272cf05 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -121,7 +121,7 @@ def get_any_feature_view( @abstractmethod def _list_all_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs: Any ) -> List[BaseFeatureView]: pass @@ -130,13 +130,14 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_all_feature_views( - self.cached_registry_proto, project, tags + self.cached_registry_proto, project, tags, skip_udf=skip_udf ) - return self._list_all_feature_views(project, tags) + return self._list_all_feature_views(project, tags, skip_udf=skip_udf) @abstractmethod def _get_feature_view(self, name: str, project: str) -> FeatureView: @@ -154,7 +155,7 @@ def get_feature_view( @abstractmethod def _list_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs: Any ) -> List[FeatureView]: pass @@ -163,13 +164,14 @@ def list_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[FeatureView]: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_feature_views( - self.cached_registry_proto, project, tags + self.cached_registry_proto, project, tags, skip_udf=skip_udf ) - return self._list_feature_views(project, tags) + return self._list_feature_views(project, tags, skip_udf=skip_udf) @abstractmethod def _get_on_demand_feature_view( @@ -189,7 +191,7 @@ def get_on_demand_feature_view( @abstractmethod def _list_on_demand_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs: Any ) -> List[OnDemandFeatureView]: pass @@ -198,13 +200,14 @@ def list_on_demand_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_on_demand_feature_views( - self.cached_registry_proto, project, tags + self.cached_registry_proto, project, tags, skip_udf=skip_udf ) - return self._list_on_demand_feature_views(project, tags) + return self._list_on_demand_feature_views(project, tags, skip_udf=skip_udf) @abstractmethod def _get_stream_feature_view(self, name: str, project: str) -> StreamFeatureView: @@ -222,7 +225,7 @@ def get_stream_feature_view( @abstractmethod def _list_stream_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs: Any ) -> List[StreamFeatureView]: pass @@ -231,13 +234,14 @@ def list_stream_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[StreamFeatureView]: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_stream_feature_views( - self.cached_registry_proto, project, tags + self.cached_registry_proto, project, tags, skip_udf=skip_udf ) - return self._list_stream_feature_views(project, tags) + return self._list_stream_feature_views(project, tags, skip_udf=skip_udf) @abstractmethod def _get_feature_service(self, name: str, project: str) -> FeatureService: diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 82b7f3e8aaa..e315e3b706c 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -57,15 +57,19 @@ def wrapper( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]], + **kwargs, ): nonlocal cache_key, cache_value - key = tuple([id(registry_proto), registry_proto.version_id, project, tags]) + kwargs_key = tuple(sorted(kwargs.items())) if kwargs else () + key = tuple( + [id(registry_proto), registry_proto.version_id, project, tags, kwargs_key] + ) if key == cache_key: return cache_value else: - cache_value = func(registry_proto, project, tags) + cache_value = func(registry_proto, project, tags, **kwargs) cache_key = key return cache_value @@ -279,31 +283,42 @@ def list_feature_services( @registry_proto_cache_with_tags def list_all_feature_views( - registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, ) -> List[BaseFeatureView]: return ( - list_feature_views(registry_proto, project, tags) - + list_stream_feature_views(registry_proto, project, tags) - + list_on_demand_feature_views(registry_proto, project, tags) + list_feature_views(registry_proto, project, tags, skip_udf=skip_udf) + + list_stream_feature_views(registry_proto, project, tags, skip_udf=skip_udf) + + list_on_demand_feature_views(registry_proto, project, tags, skip_udf=skip_udf) ) @registry_proto_cache_with_tags def list_feature_views( - registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, ) -> List[FeatureView]: feature_views: List[FeatureView] = [] for feature_view_proto in registry_proto.feature_views: if feature_view_proto.spec.project == project and utils.has_all_tags( feature_view_proto.spec.tags, tags ): - feature_views.append(FeatureView.from_proto(feature_view_proto)) + feature_views.append( + FeatureView.from_proto(feature_view_proto, skip_udf=skip_udf) + ) return feature_views @registry_proto_cache_with_tags def list_stream_feature_views( - registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, ) -> List[StreamFeatureView]: stream_feature_views = [] for stream_feature_view in registry_proto.stream_feature_views: @@ -311,14 +326,17 @@ def list_stream_feature_views( stream_feature_view.spec.tags, tags ): stream_feature_views.append( - StreamFeatureView.from_proto(stream_feature_view) + StreamFeatureView.from_proto(stream_feature_view, skip_udf=skip_udf) ) return stream_feature_views @registry_proto_cache_with_tags def list_on_demand_feature_views( - registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: on_demand_feature_views = [] for on_demand_feature_view in registry_proto.on_demand_feature_views: @@ -326,7 +344,9 @@ def list_on_demand_feature_views( on_demand_feature_view.spec.tags, tags ): on_demand_feature_views.append( - OnDemandFeatureView.from_proto(on_demand_feature_view) + OnDemandFeatureView.from_proto( + on_demand_feature_view, skip_udf=skip_udf + ) ) return on_demand_feature_views diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index e26595ad4af..c7d638af125 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -877,12 +877,13 @@ def list_stream_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[StreamFeatureView]: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) return proto_registry_utils.list_stream_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) def list_on_demand_feature_views( @@ -890,12 +891,13 @@ def list_on_demand_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) return proto_registry_utils.list_on_demand_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) def get_on_demand_feature_view( @@ -978,12 +980,13 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) return proto_registry_utils.list_all_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) def get_any_feature_view( @@ -999,11 +1002,14 @@ def list_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[FeatureView]: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - return proto_registry_utils.list_feature_views(registry_proto, project, tags) + return proto_registry_utils.list_feature_views( + registry_proto, project, tags, skip_udf=skip_udf + ) def get_feature_view( self, name: str, project: str, allow_cache: bool = False diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index c553a55f754..ac5961cd677 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -268,6 +268,7 @@ def list_stream_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[StreamFeatureView]: request = RegistryServer_pb2.ListStreamFeatureViewsRequest( project=project, allow_cache=allow_cache, tags=tags @@ -292,6 +293,7 @@ def list_on_demand_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: request = RegistryServer_pb2.ListOnDemandFeatureViewsRequest( project=project, allow_cache=allow_cache, tags=tags @@ -320,6 +322,7 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: request = RegistryServer_pb2.ListAllFeatureViewsRequest( project=project, allow_cache=allow_cache, tags=tags @@ -347,6 +350,7 @@ def list_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[FeatureView]: request = RegistryServer_pb2.ListFeatureViewsRequest( project=project, allow_cache=allow_cache, tags=tags diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 6382fa1c010..ca98ed3a0c9 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -643,25 +643,30 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: if allow_cache: registry_proto = self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_all_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) return ( cast( list[BaseFeatureView], - self.list_feature_views(project, allow_cache, tags), + self.list_feature_views(project, allow_cache, tags, skip_udf=skip_udf), ) + cast( list[BaseFeatureView], - self.list_stream_feature_views(project, allow_cache, tags), + self.list_stream_feature_views( + project, allow_cache, tags, skip_udf=skip_udf + ), ) + cast( list[BaseFeatureView], - self.list_on_demand_feature_views(project, allow_cache, tags), + self.list_on_demand_feature_views( + project, allow_cache, tags, skip_udf=skip_udf + ), ) ) @@ -859,11 +864,12 @@ def list_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[FeatureView]: if allow_cache: registry_proto = self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) return self._list_objects( "FEATURE_VIEWS", @@ -872,6 +878,7 @@ def list_feature_views( FeatureView, "FEATURE_VIEW_PROTO", tags=tags, + skip_udf=skip_udf, ) def list_on_demand_feature_views( @@ -879,11 +886,12 @@ def list_on_demand_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[OnDemandFeatureView]: if allow_cache: registry_proto = self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_on_demand_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) return self._list_objects( "ON_DEMAND_FEATURE_VIEWS", @@ -892,6 +900,7 @@ def list_on_demand_feature_views( OnDemandFeatureView, "ON_DEMAND_FEATURE_VIEW_PROTO", tags=tags, + skip_udf=skip_udf, ) def list_saved_datasets( @@ -919,11 +928,12 @@ def list_stream_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[StreamFeatureView]: if allow_cache: registry_proto = self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_stream_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) return self._list_objects( "STREAM_FEATURE_VIEWS", @@ -931,6 +941,7 @@ def list_stream_feature_views( StreamFeatureViewProto, StreamFeatureView, "STREAM_FEATURE_VIEW_PROTO", + skip_udf=skip_udf, tags=tags, ) @@ -957,7 +968,20 @@ def _list_objects( python_class: Any, proto_field_name: str, tags: Optional[dict[str, str]] = None, + proto_only: bool = False, + skip_udf: bool = False, ): + """ + Args: + proto_only: If True, return raw protobuf objects without calling + from_proto(). Used by proto() to build the RegistryProto cache + efficiently — avoids the from_proto()/to_proto() round-trip and + works uniformly for all object types (entities, data sources, etc.). + skip_udf: If True, call from_proto() but skip deserializing UDFs + (dill.loads). Returns Python objects suitable for filtering and + display without requiring the UDF's source module to be installed. + Only relevant for feature view types. + """ with GetSnowflakeConnection(self.registry_config) as conn: query = f""" SELECT @@ -971,11 +995,17 @@ def _list_objects( if not df.empty: objects = [] for row in df.iterrows(): - obj = python_class.from_proto( - proto_class.FromString(row[1][proto_field_name]) - ) - if has_all_tags(obj.tags, tags): - objects.append(obj) + proto = proto_class.FromString(row[1][proto_field_name]) + if proto_only: + objects.append(proto) + else: + obj = ( + python_class.from_proto(proto, skip_udf=skip_udf) + if skip_udf + else python_class.from_proto(proto) + ) + if has_all_tags(obj.tags, tags): + objects.append(obj) return objects return [] @@ -1134,28 +1164,90 @@ def process_project(project: Project): r.projects.extend([project.to_proto()]) last_updated_timestamps.append(last_updated_timestamp) - for lister, registry_proto_field in [ - (self.list_entities, r.entities), - (self.list_feature_views, r.feature_views), - (self.list_data_sources, r.data_sources), - (self.list_on_demand_feature_views, r.on_demand_feature_views), - (self.list_stream_feature_views, r.stream_feature_views), - (self.list_feature_services, r.feature_services), - (self.list_saved_datasets, r.saved_datasets), - (self.list_validation_references, r.validation_references), - (self.list_permissions, r.permissions), + # proto_only=True: return raw protos without calling from_proto(), + # which would trigger dill.loads() on UDFs and fail for cross-project + # modules. _list_objects hits the DB directly (no cache), avoiding + # infinite recursion since proto() itself builds the cache. + for ( + table, + proto_class, + python_class, + proto_field_name, + registry_proto_field, + ) in [ + ("ENTITIES", EntityProto, Entity, "ENTITY_PROTO", r.entities), + ( + "FEATURE_VIEWS", + FeatureViewProto, + FeatureView, + "FEATURE_VIEW_PROTO", + r.feature_views, + ), + ( + "DATA_SOURCES", + DataSourceProto, + DataSource, + "DATA_SOURCE_PROTO", + r.data_sources, + ), + ( + "ON_DEMAND_FEATURE_VIEWS", + OnDemandFeatureViewProto, + OnDemandFeatureView, + "ON_DEMAND_FEATURE_VIEW_PROTO", + r.on_demand_feature_views, + ), + ( + "STREAM_FEATURE_VIEWS", + StreamFeatureViewProto, + StreamFeatureView, + "STREAM_FEATURE_VIEW_PROTO", + r.stream_feature_views, + ), + ( + "FEATURE_SERVICES", + FeatureServiceProto, + FeatureService, + "FEATURE_SERVICE_PROTO", + r.feature_services, + ), + ( + "SAVED_DATASETS", + SavedDatasetProto, + SavedDataset, + "SAVED_DATASET_PROTO", + r.saved_datasets, + ), + ( + "VALIDATION_REFERENCES", + ValidationReferenceProto, + ValidationReference, + "VALIDATION_REFERENCE_PROTO", + r.validation_references, + ), + ( + "PERMISSIONS", + PermissionProto, + Permission, + "PERMISSION_PROTO", + r.permissions, + ), ]: - # Always bypass cache here: proto() builds the cache, so using - # allow_cache=True would cause infinite recursion via refresh(). - objs: List[Any] = lister(project_name, False) # type: ignore + objs = self._list_objects( + table, + project_name, + proto_class, + python_class, + proto_field_name, + proto_only=True, + ) if objs: - obj_protos = [obj.to_proto() for obj in objs] - for obj_proto in obj_protos: + for obj_proto in objs: if "spec" in obj_proto.DESCRIPTOR.fields_by_name: obj_proto.spec.project = project_name else: obj_proto.project = project_name - registry_proto_field.extend(obj_protos) + registry_proto_field.extend(objs) # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index ae09c8e52b6..4b02f999a0d 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -387,7 +387,7 @@ def _get_stream_feature_view(self, name: str, project: str): ) def _list_stream_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[StreamFeatureView]: return self._list_objects( stream_feature_views, @@ -396,6 +396,7 @@ def _list_stream_feature_views( StreamFeatureView, "feature_view_proto", tags=tags, + **kwargs, ) def apply_entity(self, entity: Entity, project: str, commit: bool = True): @@ -457,20 +458,22 @@ def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: return fv def _list_all_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[BaseFeatureView]: return ( cast( list[BaseFeatureView], - self._list_feature_views(project=project, tags=tags), + self._list_feature_views(project=project, tags=tags, **kwargs), ) + cast( list[BaseFeatureView], - self._list_stream_feature_views(project=project, tags=tags), + self._list_stream_feature_views(project=project, tags=tags, **kwargs), ) + cast( list[BaseFeatureView], - self._list_on_demand_feature_views(project=project, tags=tags), + self._list_on_demand_feature_views( + project=project, tags=tags, **kwargs + ), ) ) @@ -537,7 +540,7 @@ def _get_validation_reference(self, name: str, project: str) -> ValidationRefere ) def _list_validation_references( - self, project: str, tags: Optional[dict[str, str]] = None + self, project: str, tags: Optional[dict[str, str]] = None, **kwargs ) -> List[ValidationReference]: return self._list_objects( table=validation_references, @@ -546,13 +549,20 @@ def _list_validation_references( python_class=ValidationReference, proto_field_name="validation_reference_proto", tags=tags, + **kwargs, ) def _list_entities( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[Entity]: return self._list_objects( - entities, project, EntityProto, Entity, "entity_proto", tags=tags + entities, + project, + EntityProto, + Entity, + "entity_proto", + tags=tags, + **kwargs, ) def delete_entity(self, name: str, project: str, commit: bool = True): @@ -614,7 +624,7 @@ def _get_data_source(self, name: str, project: str) -> DataSource: ) def _list_data_sources( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[DataSource]: return self._list_objects( data_sources, @@ -623,6 +633,7 @@ def _list_data_sources( DataSource, "data_source_proto", tags=tags, + **kwargs, ) def apply_data_source( @@ -878,7 +889,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): raise DataSourceObjectNotFoundException(name, project) def _list_feature_services( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[FeatureService]: return self._list_objects( feature_services, @@ -887,10 +898,11 @@ def _list_feature_services( FeatureService, "feature_service_proto", tags=tags, + **kwargs, ) def _list_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[FeatureView]: return self._list_objects( feature_views, @@ -899,10 +911,11 @@ def _list_feature_views( FeatureView, "feature_view_proto", tags=tags, + **kwargs, ) def _list_saved_datasets( - self, project: str, tags: Optional[dict[str, str]] = None + self, project: str, tags: Optional[dict[str, str]] = None, **kwargs ) -> List[SavedDataset]: return self._list_objects( saved_datasets, @@ -911,10 +924,11 @@ def _list_saved_datasets( SavedDataset, "saved_dataset_proto", tags=tags, + **kwargs, ) def _list_on_demand_feature_views( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[OnDemandFeatureView]: return self._list_objects( on_demand_feature_views, @@ -923,6 +937,7 @@ def _list_on_demand_feature_views( OnDemandFeatureView, "feature_view_proto", tags=tags, + **kwargs, ) def _list_project_metadata(self, project: str) -> List[ProjectMetadata]: @@ -1232,26 +1247,29 @@ def process_project(project: Project): r.projects.extend([project.to_proto()]) last_updated_timestamps.append(last_updated_timestamp) + # proto_only=True: return raw protos without calling from_proto(), + # which would trigger dill.loads() on UDFs and fail for cross-project + # modules. The _list_* helpers hit the DB directly (no cache), avoiding + # infinite recursion since proto() itself builds the cache. for lister, registry_proto_field in [ - (self.list_entities, r.entities), - (self.list_feature_views, r.feature_views), - (self.list_data_sources, r.data_sources), - (self.list_on_demand_feature_views, r.on_demand_feature_views), - (self.list_stream_feature_views, r.stream_feature_views), - (self.list_feature_services, r.feature_services), - (self.list_saved_datasets, r.saved_datasets), - (self.list_validation_references, r.validation_references), - (self.list_permissions, r.permissions), + (self._list_entities, r.entities), + (self._list_feature_views, r.feature_views), + (self._list_data_sources, r.data_sources), + (self._list_on_demand_feature_views, r.on_demand_feature_views), + (self._list_stream_feature_views, r.stream_feature_views), + (self._list_feature_services, r.feature_services), + (self._list_saved_datasets, r.saved_datasets), + (self._list_validation_references, r.validation_references), + (self._list_permissions, r.permissions), ]: - objs: List[Any] = lister(project_name, allow_cache=False) # type: ignore + objs: List[Any] = lister(project_name, tags=None, proto_only=True) # type: ignore if objs: - obj_protos = [obj.to_proto() for obj in objs] - for obj_proto in obj_protos: + for obj_proto in objs: if "spec" in obj_proto.DESCRIPTOR.fields_by_name: obj_proto.spec.project = project_name else: obj_proto.project = project_name - registry_proto_field.extend(obj_protos) + registry_proto_field.extend(objs) # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. @@ -1486,18 +1504,37 @@ def _list_objects( python_class: Any, proto_field_name: str, tags: Optional[dict[str, str]] = None, + proto_only: bool = False, + skip_udf: bool = False, ): + """ + Args: + proto_only: If True, return raw protobuf objects without calling + from_proto(). Used by proto() to build the RegistryProto cache + efficiently — avoids the from_proto()/to_proto() round-trip and + works uniformly for all object types (entities, data sources, etc.). + skip_udf: If True, call from_proto() but skip deserializing UDFs + (dill.loads). Returns Python objects suitable for filtering and + display without requiring the UDF's source module to be installed. + Only relevant for feature view types. + """ with self.read_engine.begin() as conn: stmt = select(table).where(table.c.project_id == project) rows = conn.execute(stmt).all() if rows: objects = [] for row in rows: - obj = python_class.from_proto( - proto_class.FromString(row._mapping[proto_field_name]) - ) - if utils.has_all_tags(obj.tags, tags): - objects.append(obj) + proto = proto_class.FromString(row._mapping[proto_field_name]) + if proto_only: + objects.append(proto) + else: + obj = ( + python_class.from_proto(proto, skip_udf=skip_udf) + if skip_udf + else python_class.from_proto(proto) + ) + if utils.has_all_tags(obj.tags, tags): + objects.append(obj) return objects return [] @@ -1568,7 +1605,7 @@ def _get_permission(self, name: str, project: str) -> Permission: ) def _list_permissions( - self, project: str, tags: Optional[dict[str, str]] + self, project: str, tags: Optional[dict[str, str]], **kwargs ) -> List[Permission]: return self._list_objects( permissions, @@ -1577,6 +1614,7 @@ def _list_permissions( Permission, "permission_proto", tags=tags, + **kwargs, ) def apply_permission( diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index dbd7738f21d..c69005478e0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -155,6 +155,7 @@ class OnDemandFeatureView(BaseFeatureView): udf: Optional[FunctionType] udf_string: Optional[str] aggregations: List[Aggregation] + _raw_feature_transformation_proto: Optional[Any] = None def __init__( # noqa: C901 self, @@ -609,7 +610,12 @@ def to_proto(self) -> OnDemandFeatureViewProto: request_data_source=self._input_schema_sentinel.to_proto() ) - feature_transformation = transformation_to_proto(self.feature_transformation) + if getattr(self, "_raw_feature_transformation_proto", None) is not None: + feature_transformation = self._raw_feature_transformation_proto + else: + feature_transformation = transformation_to_proto( + self.feature_transformation + ) tags = dict(self.tags) if self.tags else {} if self.track_metrics: @@ -727,6 +733,13 @@ def from_proto( else: on_demand_feature_view_obj.current_version_number = None + if skip_udf and on_demand_feature_view_proto.spec.HasField( + "feature_transformation" + ): + on_demand_feature_view_obj._raw_feature_transformation_proto = ( + on_demand_feature_view_proto.spec.feature_transformation + ) + # Set timestamps if present cls._set_timestamps_from_proto( on_demand_feature_view_proto, on_demand_feature_view_obj diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 061e9c29c4f..c55cd1cc3b2 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -392,6 +392,7 @@ def ListFeatureViews( project=request.project, allow_cache=request.allow_cache, tags=dict(request.tags), + skip_udf=True, ), ), actions=AuthzedAction.DESCRIBE, @@ -416,6 +417,7 @@ def ListAllFeatureViews( project=request.project, allow_cache=request.allow_cache, tags=dict(request.tags), + skip_udf=True, ), ) @@ -588,6 +590,7 @@ def ListStreamFeatureViews( project=request.project, allow_cache=request.allow_cache, tags=dict(request.tags), + skip_udf=True, ), ), actions=AuthzedAction.DESCRIBE, @@ -629,6 +632,7 @@ def ListOnDemandFeatureViews( project=request.project, allow_cache=request.allow_cache, tags=dict(request.tags), + skip_udf=True, ), ), actions=AuthzedAction.DESCRIBE, @@ -1138,6 +1142,7 @@ def ListFeatures(self, request: RegistryServer_pb2.ListFeaturesRequest, context) feature_views = self.proxied_registry.list_all_feature_views( project=request.project, allow_cache=allow_cache, + skip_udf=True, ) permitted_fvs = permitted_resources( resources=cast(list[FeastObject], feature_views), diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 3d0f98edae3..b461e85d50c 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -101,6 +101,8 @@ class StreamFeatureView(FeatureView): timestamp_field: str enable_tiling: bool tiling_hop_size: Optional[timedelta] + _raw_udf_proto: Optional[Any] = None + _raw_feature_transformation_proto: Optional[Any] = None def __init__( self, @@ -277,7 +279,12 @@ def to_proto(self): stream_source_proto = serialize_data_source(self.stream_source) udf_proto, feature_transformation = None, None - if self.udf: + if getattr(self, "_raw_udf_proto", None) is not None: + udf_proto = self._raw_udf_proto + feature_transformation = getattr( + self, "_raw_feature_transformation_proto", None + ) + elif self.udf: udf_proto = UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), @@ -403,6 +410,13 @@ def from_proto(cls, sfv_proto, skip_udf: bool = False): else: stream_feature_view.current_version_number = None + if skip_udf and sfv_proto.spec.HasField("user_defined_function"): + stream_feature_view._raw_udf_proto = sfv_proto.spec.user_defined_function + if skip_udf and sfv_proto.spec.HasField("feature_transformation"): + stream_feature_view._raw_feature_transformation_proto = ( + sfv_proto.spec.feature_transformation + ) + stream_feature_view.entities = list(sfv_proto.spec.entities) stream_feature_view.features = [ diff --git a/sdk/python/tests/unit/infra/registry/test_sql_registry.py b/sdk/python/tests/unit/infra/registry/test_sql_registry.py index 5f144adbaf4..1a3ec92a4a6 100644 --- a/sdk/python/tests/unit/infra/registry/test_sql_registry.py +++ b/sdk/python/tests/unit/infra/registry/test_sql_registry.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys import tempfile +import types from datetime import timedelta +import dill import pytest from feast import Field @@ -23,7 +26,11 @@ from feast.errors import ConflictingFeatureViewNames from feast.feature_view import FeatureView from feast.infra.offline_stores.file_source import FileSource -from feast.infra.registry.sql import SqlRegistry, SqlRegistryConfig +from feast.infra.registry.sql import SqlRegistry, SqlRegistryConfig, feature_views +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2, + UserDefinedFunctionV2, +) from feast.stream_feature_view import StreamFeatureView from feast.types import Float32 from feast.value_type import ValueType @@ -44,6 +51,13 @@ def sqlite_registry(): registry.teardown() +@pytest.fixture +def shared_sqlite_db_path(): + """Return a shared SQLite DB path for cross-project tests.""" + fd, path = tempfile.mkstemp() + yield path + + def test_sql_registry(sqlite_registry): """ Test the SQL registry @@ -105,3 +119,104 @@ def test_feature_view_name_conflict_between_stream_and_batch(sqlite_registry): with pytest.raises(ConflictingFeatureViewNames): sqlite_registry.apply_feature_view(stream_view, "test_project") + + +def _serialize_udf_referencing_module(module_name: str) -> bytes: + """Create a dill-serialized UDF that references a fake module. + + The function is defined inside a temporary module so that dill records + a dependency on that module. After serialization, the module is removed + from sys.modules so that deserializing the bytes will raise + ModuleNotFoundError. + """ + mod = types.ModuleType(module_name) + mod.__package__ = module_name + sys.modules[module_name] = mod + exec("def _udf(x): return x", mod.__dict__) + udf_bytes = dill.dumps(mod._udf) + del sys.modules[module_name] + return udf_bytes + + +def test_shared_registry_cross_project_udf_does_not_crash(shared_sqlite_db_path): + """Initializing a SqlRegistry must not crash when another project in the + same database has a feature view whose UDF references a module that is + not installed in the current environment. + + Before the fix, proto() called from_proto() on every feature view across + all projects, triggering dill.loads() which raised ModuleNotFoundError. + """ + db_url = f"sqlite:///{shared_sqlite_db_path}" + config = SqlRegistryConfig( + registry_type="sql", path=db_url, purge_feast_metadata=False + ) + + registry_a = SqlRegistry(config, "project_a", None) + + entity = Entity(name="driver", join_keys=["driver_id"]) + registry_a.apply_entity(entity, "project_a") + + file_source = FileSource( + path="driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + fv = FeatureView( + name="driver_features", + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="conv_rate", dtype=Float32)], + source=file_source, + ) + registry_a.apply_feature_view(fv, "project_a") + + # Inject a UDF body that references a non-existent module directly into + # the DB, simulating a feature view from another project that uses a + # module not available in this environment. + fake_udf_bytes = _serialize_udf_referencing_module("nonexistent_project_module") + + fv_proto = fv.to_proto() + fv_proto.spec.project = "project_a" + fv_proto.spec.feature_transformation.CopyFrom( + FeatureTransformationV2( + user_defined_function=UserDefinedFunctionV2( + name="fake_udf", + body=fake_udf_bytes, + body_text="def _udf(x): return x", + mode="python", + ) + ) + ) + with registry_a.write_engine.begin() as conn: + from sqlalchemy import update + + stmt = ( + update(feature_views) + .where( + feature_views.c.feature_view_name == "driver_features", + feature_views.c.project_id == "project_a", + ) + .values(feature_view_proto=fv_proto.SerializeToString()) + ) + conn.execute(stmt) + + # Creating a new SqlRegistry for project_b against the same DB should + # NOT crash even though project_a has a UDF referencing an unavailable + # module. proto() should read raw protos without deserializing UDFs. + registry_b = SqlRegistry(config, "project_b", None) + + entity_b = Entity(name="customer", join_keys=["customer_id"]) + registry_b.apply_entity(entity_b, "project_b") + retrieved = registry_b.get_entity("customer", "project_b") + assert retrieved.name == "customer" + + # Verify project_a's data is still accessible in the cached proto + proto = registry_b.proto() + project_names = [p.spec.name for p in proto.projects] + assert "project_a" in project_names + assert "project_b" in project_names + + fv_names = [fv.spec.name for fv in proto.feature_views] + assert "driver_features" in fv_names + + registry_a.teardown() From f1cb1c8ebb45e63c76184c3093a2a1b1436e6bae Mon Sep 17 00:00:00 2001 From: ntkathole Date: Fri, 22 May 2026 16:35:17 +0530 Subject: [PATCH 2/2] fix: PyJWT 2.10+ added validation that rejects empty HMAC keys Signed-off-by: ntkathole --- .../client/intra_comm_authentication_client_manager.py | 2 +- .../feast/permissions/client/kubernetes_auth_client_manager.py | 2 +- .../permissions/client/oidc_authentication_client_manager.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/permissions/client/intra_comm_authentication_client_manager.py b/sdk/python/feast/permissions/client/intra_comm_authentication_client_manager.py index 30476316c12..bdc6159a2f8 100644 --- a/sdk/python/feast/permissions/client/intra_comm_authentication_client_manager.py +++ b/sdk/python/feast/permissions/client/intra_comm_authentication_client_manager.py @@ -29,4 +29,4 @@ def get_token(self): f"No Auth client manager implemented for the auth type:{self.auth_config.type}" ) - return jwt.encode(payload, "") + return jwt.encode(payload, "", algorithm="none") diff --git a/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py b/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py index 0cee687d08f..ac94b8713ad 100644 --- a/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py +++ b/sdk/python/feast/permissions/client/kubernetes_auth_client_manager.py @@ -22,7 +22,7 @@ def get_token(self): "sub": f":::{intra_communication_base64}", # Subject claim } - return jwt.encode(payload, "") + return jwt.encode(payload, "", algorithm="none") # Check if user token is provided in config (for external users) if hasattr(self.auth_config, "user_token") and self.auth_config.user_token: diff --git a/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py b/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py index 84a0c0115c9..37e613dafc1 100644 --- a/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py +++ b/sdk/python/feast/permissions/client/oidc_authentication_client_manager.py @@ -24,7 +24,7 @@ def get_token(self): payload = { "preferred_username": f"{intra_communication_base64}", } - return jwt.encode(payload, "") + return jwt.encode(payload, "", algorithm="none") if self.auth_config.token: return self.auth_config.token