Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Added list proto methods
  • Loading branch information
Bhargav Dodla committed Oct 23, 2024
commit 9818798d8cd94a5322eaa168d040afe9fdb3dffb
108 changes: 108 additions & 0 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
Expand Down Expand Up @@ -467,3 +478,100 @@ def _start_thread_async_refresh(self, cache_ttl_seconds):

def _exit_handler(self):
self.registry_refresh_thread.cancel()

# Methods to improve the registry calls

@abstractmethod
def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
pass

def list_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureViewProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_views_proto(project, tags)

@abstractmethod
def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
pass

def list_entities_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[EntityProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_entities_proto(
self.cached_registry_proto, project, tags
)
return self._list_entities_proto(project, tags)

@abstractmethod
def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
pass

def list_data_sources_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[DataSourceProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_data_sources_proto(
self.cached_registry_proto, project, tags
)
return self._list_data_sources_proto(project, tags)

@abstractmethod
def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
pass

def list_on_demand_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[OnDemandFeatureViewProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_on_demand_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_on_demand_feature_views_proto(project, tags)

@abstractmethod
def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
pass

def list_feature_services_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureServiceProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_services_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_services_proto(project, tags)
76 changes: 76 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
Expand Down Expand Up @@ -367,3 +378,68 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project:
if projects_proto.spec.name == name:
return Project.from_proto(projects_proto)
raise ProjectObjectNotFoundException(name=name)


@registry_proto_cache_with_tags
def list_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
feature_views: List[FeatureViewProtoList] = []
for feature_view_proto in registry_proto.feature_views:
if feature_view_proto.spec.project == project and utils.has_all_tags(
feature_view_proto.spec.tags, tags
):
feature_views.append(feature_view_proto)
return feature_views


@registry_proto_cache_with_tags
def list_feature_services_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
feature_services = []
for feature_service_proto in registry_proto.feature_services:
if feature_service_proto.spec.project == project and utils.has_all_tags(
feature_service_proto.spec.tags, tags
):
feature_services.append(feature_service_proto)
return feature_services


@registry_proto_cache_with_tags
def list_on_demand_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
on_demand_feature_views = []
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if on_demand_feature_view.spec.project == project and utils.has_all_tags(
on_demand_feature_view.spec.tags, tags
):
on_demand_feature_views.append(on_demand_feature_view)
return on_demand_feature_views


@registry_proto_cache_with_tags
def list_entities_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
entities = []
for entity_proto in registry_proto.entities:
if entity_proto.spec.project == project and utils.has_all_tags(
entity_proto.spec.tags, tags
):
entities.append(entity_proto)
return entities


@registry_proto_cache_with_tags
def list_data_sources_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
data_sources = []
for data_source_proto in registry_proto.data_sources:
if data_source_proto.project == project and utils.has_all_tags(
data_source_proto.tags, tags
):
data_sources.append(data_source_proto)
return data_sources
86 changes: 86 additions & 0 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,26 @@
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto
from feast.protos.feast.core.Project_pb2 import Project as ProjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -1326,3 +1337,78 @@ def get_project_metadata(
datetime.utcfromtimestamp(int(metadata_value))
)
return project_metadata_model

def _list_objects_proto(
self,
table: Table,
project: str,
proto_class: Any,
proto_field_name: str,
tags: Optional[dict[str, str]] = None,
):
with self.read_engine.begin() as conn:
stmt = select(table).where(table.c.project_id == project)
rows = conn.execute(stmt).all()
if rows:
objects = []
for row in rows:
obj = proto_class.FromString(row._mapping[proto_field_name])
if utils.has_all_tags(dict(obj.spec.tags), tags):
objects.append(obj)
return objects
return []

def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
return self._list_objects_proto(
feature_services,
project,
FeatureServiceProto,
"feature_service_proto",
tags=tags,
)

def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
return self._list_objects_proto(
feature_views,
project,
FeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
return self._list_objects_proto(
on_demand_feature_views,
project,
OnDemandFeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
return self._list_objects_proto(
entities,
project,
EntityProto,
"entity_proto",
tags=tags,
)

def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
return self._list_objects_proto(
data_sources,
project,
DataSourceProto,
"data_source_proto",
tags=tags,
)