Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Extend FeastError and fixed integration tests
Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com>
  • Loading branch information
Bhargav Dodla committed Sep 4, 2024
commit 28e11eb77ba31b1ace885ebd2b61d0c5623d1741
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def __init__(self, name, project=None):
super().__init__(f"Permission {name} does not exist")


class ProjectNotFoundException(Exception):
class ProjectNotFoundException(FeastError):
def __init__(self, project):
super().__init__(f"Project {project} does not exist in registry")

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def online_read(
result_tuples.append((event_ts, feature_values_dict))
return result_tuples
else:
error_msg = f"Unable to retrieve the online store data using feature server API. Error_code={response.status_code}, error_message={response.reason}"
error_msg = f"Unable to retrieve the online store data using feature server API. Error_code={response.status_code}, error_message={response.text}"
logger.error(error_msg)
raise RuntimeError(error_msg)

Expand Down
18 changes: 9 additions & 9 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def apply_data_source(
self.commit()

def delete_data_source(self, name: str, project: str, commit: bool = True):
self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, data_source_proto in enumerate(
Expand Down Expand Up @@ -617,7 +617,7 @@ def get_stream_feature_view(
)

def delete_feature_service(self, name: str, project: str, commit: bool = True):
self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, feature_service_proto in enumerate(
Expand All @@ -634,7 +634,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True):
raise FeatureServiceNotFoundException(name, project)

def delete_feature_view(self, name: str, project: str, commit: bool = True):
self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, existing_feature_view_proto in enumerate(
Expand Down Expand Up @@ -676,7 +676,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True):
raise FeatureViewNotFoundException(name, project)

def delete_entity(self, name: str, project: str, commit: bool = True):
self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, existing_entity_proto in enumerate(
Expand Down Expand Up @@ -801,15 +801,16 @@ def list_validation_references(
)

def delete_validation_reference(self, name: str, project: str, commit: bool = True):
registry_proto = self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto
for idx, existing_validation_reference in enumerate(
registry_proto.validation_references
self.cached_registry_proto.validation_references
):
if (
existing_validation_reference.name == name
and existing_validation_reference.project == project
):
del registry_proto.validation_references[idx]
del self.cached_registry_proto.validation_references[idx]
if commit:
self.commit()
return
Expand Down Expand Up @@ -968,7 +969,7 @@ def apply_permission(
self.commit()

def delete_permission(self, name: str, project: str, commit: bool = True):
self._get_registry_proto(project=project, allow_cache=False)
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, permission_proto in enumerate(self.cached_registry_proto.permissions):
Expand Down Expand Up @@ -1026,7 +1027,6 @@ def delete_project(
name: str,
commit: bool = True,
):
self._get_registry_proto(project=name, allow_cache=False)
assert self.cached_registry_proto

for idx, project_proto in enumerate(self.cached_registry_proto.projects):
Expand Down
26 changes: 1 addition & 25 deletions sdk/python/feast/infra/registry/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timedelta, timezone
from enum import Enum
from threading import Lock
from typing import Any, Callable, List, Literal, Optional, Set, Union
from typing import Any, Callable, List, Literal, Optional, Union

from pydantic import ConfigDict, Field, StrictStr

Expand Down Expand Up @@ -1101,30 +1101,6 @@ def process_project(project: Project):

return r

def _get_all_projects(self) -> Set[str]:
projects = set()

base_tables = [
"DATA_SOURCES",
"ENTITIES",
"FEATURE_VIEWS",
"ON_DEMAND_FEATURE_VIEWS",
"STREAM_FEATURE_VIEWS",
"PERMISSIONS",
]

with GetSnowflakeConnection(self.registry_config) as conn:
for table in base_tables:
query = (
f'SELECT DISTINCT project_id FROM {self.registry_path}."{table}"'
)
df = execute_snowflake_statement(conn, query).fetch_pandas_all()

for row in df.iterrows():
projects.add(row[1]["PROJECT_ID"])

return projects

def _get_last_updated_metadata(self, project: str):
with GetSnowflakeConnection(self.registry_config) as conn:
query = f"""
Expand Down
20 changes: 1 addition & 19 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Union
from typing import Any, Callable, Dict, List, Optional, Union

from pydantic import StrictInt, StrictStr
from sqlalchemy import ( # type: ignore
Expand Down Expand Up @@ -1043,24 +1043,6 @@ def _get_last_updated_metadata(self, project: str):

return datetime.fromtimestamp(update_time, tz=timezone.utc)

def _get_all_projects(self) -> Set[str]:
projects = set()
with self.engine.begin() as conn:
for table in {
entities,
data_sources,
feature_views,
on_demand_feature_views,
stream_feature_views,
permissions,
}:
stmt = select(table)
rows = conn.execute(stmt).all()
for row in rows:
projects.add(row._mapping["project_id"])

return projects

def _get_permission(self, name: str, project: str) -> Permission:
return self._get_object(
table=permissions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ def _create_remote_client_feature_store(
auth_config=auth_config,
)

result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir)
assert result.returncode == 0

return FeatureStore(repo_path=repo_path)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
from datetime import datetime

import assertpy
Expand Down Expand Up @@ -28,8 +27,6 @@
from tests.utils.auth_permissions_util import get_remote_registry_store
from tests.utils.http_server import check_port_open # noqa: E402

logger = logging.getLogger(__name__)


@pytest.fixture
def start_registry_server(
Expand All @@ -39,7 +36,6 @@ def start_registry_server(
feature_store,
monkeypatch,
):
logger.info(f"Starting Registry call at {server_port}")
if "kubernetes" in auth_config:
mock_utils.mock_kubernetes(request=request, monkeypatch=monkeypatch)
elif "oidc" in auth_config:
Expand All @@ -53,29 +49,23 @@ def start_registry_server(
assertpy.assert_that(server_port).is_not_equal_to(0)

print(f"Starting Registry at {server_port}")
logger.info(f"{auth_config}: {server_port}: Starting Registry at {server_port}")
server = start_server(
feature_store,
server_port,
wait_for_termination=False,
)
print("Waiting server availability")
logger.info(f"{auth_config}: {server_port}: Waiting server availability")
wait_retry_backoff(
lambda: (None, check_port_open("localhost", server_port)),
timeout_secs=10,
)
print("Server started")
logger.info(f"{auth_config}: {server_port}: Server started")

yield server

print("Stopping server")
logger.info(f"{auth_config}: {server_port}: Stopping server")
server.stop(grace=None) # Teardown server

# server.wait_for_termination()


def test_registry_apis(
auth_config,
Expand All @@ -86,9 +76,6 @@ def test_registry_apis(
applied_permissions,
):
print(f"Running for\n:{auth_config}")
logger.info(
f"Running Perm Tests for\n:{auth_config} : {applied_permissions}: {server_port}"
)
remote_feature_store = get_remote_registry_store(server_port, feature_store)
permissions = _test_list_permissions(remote_feature_store, applied_permissions)
_test_get_entity(remote_feature_store, applied_permissions)
Expand Down