Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Fixed lint issues
Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com>
  • Loading branch information
Bhargav Dodla committed Aug 24, 2024
commit 9d8fdac9b102b81a6f557eb6576f04ec43a87019
2 changes: 1 addition & 1 deletion protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ message GetInfraRequest {
}

message ListProjectMetadataRequest {
string project = 1;
optional string project = 1;
bool allow_cache = 2;
}

Expand Down
18 changes: 18 additions & 0 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,24 @@ def list_project_metadata(
)
return self._list_project_metadata(project)

@abstractmethod
def _get_project_metadata(self, project: str) -> Optional[ProjectMetadata]:
pass

def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is None:
return None
else:
return ProjectMetadata.from_proto(project_metadata_proto)
return self._get_project_metadata(project)

@abstractmethod
def _get_infra(self, project: str) -> Infra:
pass
Expand Down
81 changes: 78 additions & 3 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FeatureServiceNotFoundException,
FeatureViewNotFoundException,
PermissionNotFoundException,
ProjectMetadataNotFoundException,
ValidationReferenceNotFound,
)
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -214,9 +215,11 @@ def __init__(

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
seconds=(
registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)
)

def clone(self) -> "Registry":
Expand Down Expand Up @@ -787,6 +790,12 @@ def delete_validation_reference(self, name: str, project: str, commit: bool = Tr
return
raise ValidationReferenceNotFound(name, project=project)

def apply_project_metadata(self, project: str, commit: bool = True):
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto
if commit:
self.commit()

def list_project_metadata(
self, project: Optional[str], allow_cache: bool = False
) -> List[ProjectMetadata]:
Expand All @@ -795,6 +804,72 @@ def list_project_metadata(
)
return proto_registry_utils.list_project_metadata(registry_proto, project)

def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
registry_proto = self._get_registry_proto(
project=project, allow_cache=allow_cache
)
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is None:
return None
else:
return ProjectMetadata.from_proto(project_metadata_proto)

def delete_project_metadata(self, project: str, commit: bool = True):
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, project_metadata_proto in enumerate(
self.cached_registry_proto.project_metadata
):
if project_metadata_proto.project == project:
list_entities = self.list_entities(project)
list_feature_views = self.list_feature_views(project)
list_on_demand_feature_views = self.list_on_demand_feature_views(
project
)
list_stream_feature_views = self.list_stream_feature_views(project)
list_feature_services = self.list_feature_services(project)
list_data_sources = self.list_data_sources(project)
list_saved_datasets = self.list_saved_datasets(project)
list_validation_references = self.list_validation_references(project)
list_permissions = self.list_permissions(project)
for entity in list_entities:
self.delete_entity(entity.name, project, commit=False)
for feature_view in list_feature_views:
self.delete_feature_view(feature_view.name, project, commit=False)
for on_demand_feature_view in list_on_demand_feature_views:
self.delete_feature_view(
on_demand_feature_view.name, project, commit=False
)
for stream_feature_view in list_stream_feature_views:
self.delete_feature_view(
stream_feature_view.name, project, commit=False
)
for feature_service in list_feature_services:
self.delete_feature_service(
feature_service.name, project, commit=False
)
for data_source in list_data_sources:
self.delete_data_source(data_source.name, project, commit=False)
for saved_dataset in list_saved_datasets:
self.delete_saved_dataset(saved_dataset.name, project, commit=False)
for validation_reference in list_validation_references:
self.delete_validation_reference(
validation_reference.name, project, commit=False
)
for permission in list_permissions:
self.delete_permission(permission.name, project, commit=False)
del self.cached_registry_proto.project_metadata[idx]
if commit:
self.commit()
return

raise ProjectMetadataNotFoundException(project)

def commit(self):
"""Commits the state of the registry cache to the remote registry store."""
if self.cached_registry_proto:
Expand Down
39 changes: 26 additions & 13 deletions sdk/python/feast/infra/registry/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.permissions.auth.auth_type import AuthType
from feast.permissions.auth_model import (
AuthConfig,
NoAuthConfig,
)
from feast.permissions.auth_model import AuthConfig, NoAuthConfig
from feast.permissions.client.grpc_client_auth_interceptor import (
GrpcClientAuthHeaderInterceptor,
)
Expand Down Expand Up @@ -173,15 +170,17 @@ def apply_feature_view(
arg_name = "on_demand_feature_view"

request = RegistryServer_pb2.ApplyFeatureViewRequest(
feature_view=feature_view.to_proto()
if arg_name == "feature_view"
else None,
stream_feature_view=feature_view.to_proto()
if arg_name == "stream_feature_view"
else None,
on_demand_feature_view=feature_view.to_proto()
if arg_name == "on_demand_feature_view"
else None,
feature_view=(
feature_view.to_proto() if arg_name == "feature_view" else None
),
stream_feature_view=(
feature_view.to_proto() if arg_name == "stream_feature_view" else None
),
on_demand_feature_view=(
feature_view.to_proto()
if arg_name == "on_demand_feature_view"
else None
),
project=project,
commit=commit,
)
Expand Down Expand Up @@ -383,6 +382,20 @@ def list_project_metadata(
response = self.stub.ListProjectMetadata(request)
return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata]

def apply_project_metadata(self, project: StrictStr, commit: bool = True):
# TODO: Add logic for applying project metadata
pass

def get_project_metadata(
self, project: StrictStr, allow_cache: bool = False
) -> ProjectMetadata | None:
# TODO: Add logic for getting project metadata
pass

def delete_project_metadata(self, project: StrictStr, commit: bool = True):
# TODO: Add logic for deleting project metadata
pass

def update_infra(self, infra: Infra, project: str, commit: bool = True):
request = RegistryServer_pb2.UpdateInfraRequest(
infra=infra.to_proto(), project=project, commit=commit
Expand Down
47 changes: 44 additions & 3 deletions sdk/python/feast/infra/registry/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ def __init__(
self.cached_registry_proto_created = _utc_now()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
seconds=(
registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)
)
self.project = project

Expand Down Expand Up @@ -901,6 +903,7 @@ def apply_materialization(
def list_project_metadata(
self, project: Optional[str], allow_cache: bool = False
) -> List[ProjectMetadata]:
# TODO: List all projects when project is None
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_project_metadata(
Expand Down Expand Up @@ -928,6 +931,44 @@ def list_project_metadata(
return [project_metadata]
return []

def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is None:
return None
else:
return ProjectMetadata.from_proto(project_metadata_proto)

with GetSnowflakeConnection(self.registry_config) as conn:
query = f"""
SELECT
metadata_key,
metadata_value
FROM
{self.registry_path}."FEAST_METADATA"
WHERE
project_id = '{project}'
"""
df = execute_snowflake_statement(conn, query).fetch_pandas_all()

if not df.empty:
project_metadata = ProjectMetadata(project_name=project)
for row in df.iterrows():
if row[1]["METADATA_KEY"] == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata.project_uuid = row[1]["METADATA_VALUE"]
break
# TODO(adchia): Add other project metadata in a structured way
return project_metadata
return None

def delete_project_metadata(self, project: StrictStr, commit: bool = True):
pass

def apply_user_metadata(
self,
project: str,
Expand Down
51 changes: 46 additions & 5 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
FeatureServiceNotFoundException,
FeatureViewNotFoundException,
PermissionNotFoundException,
ProjectMetadataNotFoundException,
SavedDatasetNotFound,
ValidationReferenceNotFound,
)
Expand Down Expand Up @@ -513,14 +514,35 @@ def _list_project_metadata(self, project: Optional[str]) -> List[ProjectMetadata
]
if metadata_key == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata_model.project_uuid = metadata_value

if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value:
project_metadata_model.last_updated_timestamp = (
datetime.fromtimestamp(int(metadata_value), tz=timezone.utc)
)
return list(project_metadata_dict.values())
return []

def _get_project_metadata(
self,
project: str,
) -> Optional[ProjectMetadata]:
"""
Returns given project metadata.
"""
with self.engine.begin() as conn:
stmt = select(feast_metadata).where(
feast_metadata.c.project_id == project,
)
rows = conn.execute(stmt).all()
if rows:
project_metadata: ProjectMetadata = ProjectMetadata(
project_name=project
)
for row in rows:
metadata_key = row._mapping["metadata_key"]
metadata_value = row._mapping["metadata_value"]

if metadata_key == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata.project_uuid = metadata_value
return project_metadata
else:
return None

def apply_saved_dataset(
self,
saved_dataset: SavedDataset,
Expand Down Expand Up @@ -827,6 +849,25 @@ def apply_project_metadata(self, project):
insert_stmt = insert(feast_metadata).values(values)
conn.execute(insert_stmt)

def delete_project_metadata(self, project: str, commit: bool = True):
project_metadata = self.get_project_metadata(project, allow_cache=False)
if project_metadata is None:
raise ProjectMetadataNotFoundException(project)
with self.engine.begin() as conn:
for t in {
entities,
data_sources,
feature_views,
feature_services,
on_demand_feature_views,
saved_datasets,
validation_references,
managed_infra,
feast_metadata,
}:
stmt = delete(t).where(t.c.project_id == project)
conn.execute(stmt)

def _delete_object(
self,
table: Table,
Expand Down