Skip to content
Prev Previous commit
Next Next commit
initial commit targetting grpc registry server
Signed-off-by: Daniele Martinoli <dmartino@redhat.com>
  • Loading branch information
dmartinol committed Aug 29, 2024
commit d94c9ac9522b7148e258559cc44292652c94c2c3
56 changes: 53 additions & 3 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Any, List, Optional, Set

from colorama import Fore, Style
Expand All @@ -6,16 +7,65 @@

from feast.field import Field

logger = logging.getLogger(__name__)


class FeastError(Exception):
pass

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.INTERNAL

def http_status_code(self) -> int:
return HttpStatusCode.HTTP_500_INTERNAL_SERVER_ERROR

def __str__(self) -> str:
if hasattr(self, "__overridden_message__"):
return str(getattr(self, "__overridden_message__"))
return super().__str__()

def __repr__(self) -> str:
if hasattr(self, "__overridden_message__"):
return f"{type(self).__name__}('{getattr(self,'__overridden_message__')}')"
return super().__repr__()

def to_error_detail(self) -> str:
"""
Returns a JSON representation of the error for serialization purposes.

Returns:
str: a string representation of a JSON document including `module`, `class` and `message` fields.
"""
import json

m = {
"module": f"{type(self).__module__}",
"class": f"{type(self).__name__}",
"message": f"{str(self)}",
}
return json.dumps(m)

@staticmethod
def from_error_detail(detail: str) -> Optional["FeastError"]:
import importlib
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are standard lib libraries, why not import top-level?

import json

try:
m = json.loads(detail)
if all(f in m for f in ["module", "class", "message"]):
module_name = m["module"]
class_name = m["class"]
message = m["message"]
module = importlib.import_module(module_name)
class_reference = getattr(module, class_name)

instance = class_reference(message)
setattr(instance, "__overridden_message__", message)
return instance
except Exception as e:
logger.warning(f"Invalid error detail: {detail}: {e}")
return None


class DataSourceNotFoundException(FeastError):
def __init__(self, path):
Expand All @@ -41,7 +91,7 @@ def __init__(self, ds_name: str):
class FeastObjectNotFoundException(FeastError):
pass

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.NOT_FOUND

def http_status_code(self) -> int:
Expand Down Expand Up @@ -443,7 +493,7 @@ class FeastPermissionError(FeastError, PermissionError):
def __init__(self, details: str):
super().__init__(f"Permission error:\n{details}")

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.PERMISSION_DENIED

def http_status_code(self) -> int:
Expand Down
48 changes: 48 additions & 0 deletions sdk/python/feast/grpc_error_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import grpc

from feast.errors import FeastError


def exception_wrapper(behavior, request, context):
try:
return behavior(request, context)
except grpc.RpcError as e:
context.abort(e.code(), e.details())
except FeastError as e:
context.abort(
e.grpc_status_code(),
e.to_error_detail(),
)


class ErrorInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
handler = continuation(handler_call_details)
if handler is None:
return None

if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.unary_stream:
return grpc.unary_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_unary:
return grpc.stream_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_stream:
return grpc.stream_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import grpc

from feast.errors import from_error_detail
from feast.errors import FeastError
from feast.permissions.auth_model import AuthConfig
from feast.permissions.client.auth_client_manager_factory import get_auth_token

Expand Down Expand Up @@ -42,7 +42,7 @@ def _handle_call(self, continuation, client_call_details, request_iterator):
client_call_details = self._append_auth_header_metadata(client_call_details)
result = continuation(client_call_details, request_iterator)
if result.exception() is not None:
mapped_error = from_error_detail(result.exception().details())
mapped_error = FeastError.from_error_detail(result.exception().details())
if mapped_error is not None:
raise mapped_error
return result
Expand Down
22 changes: 0 additions & 22 deletions sdk/python/feast/permissions/server/grpc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from typing import Optional

import grpc

Expand All @@ -9,32 +8,11 @@
get_auth_manager,
)
from feast.permissions.security_manager import get_security_manager
from feast.permissions.server.utils import (
AuthManagerType,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def grpc_interceptors(
auth_type: AuthManagerType,
) -> Optional[list[grpc.ServerInterceptor]]:
"""
A list of the authorization interceptors.

Args:
auth_type: The type of authorization manager, from the feature store configuration.

Returns:
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
"""
if auth_type == AuthManagerType.NONE:
return None

return [AuthInterceptor(), ErrorInterceptor()]


class AuthInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
sm = get_security_manager()
Expand Down
24 changes: 22 additions & 2 deletions sdk/python/feast/registry_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent import futures
from datetime import datetime, timezone
from typing import Union, cast
from typing import Optional, Union, cast

import grpc
from google.protobuf.empty_pb2 import Empty
Expand All @@ -13,6 +13,7 @@
from feast.errors import FeatureViewNotFoundException
from feast.feast_object import FeastObject
from feast.feature_view import FeatureView
from feast.grpc_error_interceptor import ErrorInterceptor
from feast.infra.infra_object import Infra
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
Expand All @@ -23,8 +24,9 @@
assert_permissions_to_update,
permitted_resources,
)
from feast.permissions.server.grpc import grpc_interceptors
from feast.permissions.server.grpc import AuthInterceptor
from feast.permissions.server.utils import (
AuthManagerType,
ServerType,
init_auth_manager,
init_security_manager,
Expand Down Expand Up @@ -668,3 +670,21 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr
server.wait_for_termination()
else:
return server


def grpc_interceptors(
auth_type: AuthManagerType,
) -> Optional[list[grpc.ServerInterceptor]]:
"""
A list of the authorization interceptors.
Comment thread
dmartinol marked this conversation as resolved.
Outdated

Args:
auth_type: The type of authorization manager, from the feature store configuration.

Returns:
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
"""
if auth_type == AuthManagerType.NONE:
return [ErrorInterceptor()]

return [AuthInterceptor(), ErrorInterceptor()]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from feast import (
FeatureStore,
)
from feast.errors import (
EntityNotFoundException,
FeastPermissionError,
FeatureViewNotFoundException,
)
from feast.permissions.permission import Permission
from feast.registry_server import start_server
from feast.wait import wait_retry_backoff # noqa: E402
Expand Down Expand Up @@ -70,7 +75,9 @@ def test_registry_apis(
print(f"Running for\n:{auth_config}")
remote_feature_store = get_remote_registry_store(server_port, feature_store)
permissions = _test_list_permissions(remote_feature_store, applied_permissions)
_test_get_entity(remote_feature_store, applied_permissions)
_test_list_entities(remote_feature_store, applied_permissions)
_test_get_fv(remote_feature_store, applied_permissions)
_test_list_fvs(remote_feature_store, applied_permissions)

if _permissions_exist_in_permission_list(
Expand Down Expand Up @@ -118,6 +125,20 @@ def _test_get_historical_features(client_fs: FeatureStore):
assertpy.assert_that(training_df).is_not_none()


def _test_get_entity(client_fs: FeatureStore, permissions: list[Permission]):
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
client_fs, permissions, read_entities_perm
):
entity = client_fs.get_entity("driver")
assertpy.assert_that(entity).is_not_none()
assertpy.assert_that(entity.name).is_equal_to("driver")
else:
with pytest.raises(FeastPermissionError):
client_fs.get_entity("driver")
with pytest.raises(EntityNotFoundException):
client_fs.get_entity("invalid-name")


def _test_list_entities(client_fs: FeatureStore, permissions: list[Permission]):
entities = client_fs.list_entities()

Expand Down Expand Up @@ -188,6 +209,20 @@ def _is_auth_enabled(client_fs: FeatureStore) -> bool:
return client_fs.config.auth_config.type != "no_auth"


def _test_get_fv(client_fs: FeatureStore, permissions: list[Permission]):
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
client_fs, permissions, read_fv_perm
):
fv = client_fs.get_feature_view("driver_hourly_stats")
assertpy.assert_that(fv).is_not_none()
assertpy.assert_that(fv.name).is_equal_to("driver_hourly_stats")
else:
with pytest.raises(FeastPermissionError):
client_fs.get_feature_view("driver_hourly_stats")
with pytest.raises(FeatureViewNotFoundException):
client_fs.get_feature_view("invalid-name")


def _test_list_fvs(client_fs: FeatureStore, permissions: list[Permission]):
if _is_auth_enabled(client_fs) and _permissions_exist_in_permission_list(
[invalid_list_entities_perm], permissions
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/tests/unit/test_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import re

import assertpy

import feast.errors as errors


def test_error_error_detail():
e = errors.FeatureViewNotFoundException("abc")

d = e.to_error_detail()

assertpy.assert_that(d).is_not_none()
assertpy.assert_that(d).contains('"module": "feast.errors"')
assertpy.assert_that(d).contains('"class": "FeatureViewNotFoundException"')
assertpy.assert_that(re.search(r"abc", d)).is_true()

converted_e = errors.FeastError.from_error_detail(d)
assertpy.assert_that(converted_e).is_not_none()
assertpy.assert_that(str(converted_e)).is_equal_to(str(e))
assertpy.assert_that(repr(converted_e)).is_equal_to(repr(e))


def test_invalid_error_error_detail():
e = errors.FeastError.from_error_detail("invalid")
assertpy.assert_that(e).is_none()