Skip to content
Next Next commit
initial commit targetting grpc registry server
Signed-off-by: Daniele Martinoli <dmartino@redhat.com>
  • Loading branch information
dmartinol committed Aug 28, 2024
commit 3e07339fd8506d3cf778eb546b723361a4514361
29 changes: 28 additions & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, List, Set
from typing import Any, List, Optional, Set

from colorama import Fore, Style

Expand Down Expand Up @@ -419,3 +419,30 @@ def __init__(self, query: str):
class ZeroColumnQueryResult(Exception):
def __init__(self, query: str):
super().__init__(f"This query returned zero columns:\n{query}")


def to_error_detail(error: Exception) -> str:
import json

m = {
"module": f"{type(error).__module__}",
"class": f"{type(error).__name__}",
"message": f"{str(error)}",
}
return json.dumps(m)


def from_error_detail(detail: str) -> Optional[Exception]:
import importlib
import json

m = json.loads(detail)
if all(f in m for f in ["module", "class", "message"]):
module_name = m["module"]
class_name = m["class"]
message = m["message"]
module = importlib.import_module(module_name)
ClassReference = getattr(module, class_name)
instance = ClassReference(message)
return instance
return None
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import grpc

from feast.errors import from_error_detail
from feast.permissions.auth_model import AuthConfig
from feast.permissions.client.auth_client_manager_factory import get_auth_token

Expand All @@ -20,26 +21,31 @@ def __init__(self, auth_type: AuthConfig):
def intercept_unary_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_unary_stream(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._handle_call(continuation, client_call_details, request_iterator)

def _handle_call(self, continuation, client_call_details, request_iterator):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
result = continuation(client_call_details, request_iterator)
if result.exception() is not None:
mapped_error = from_error_detail(result.exception().details())
if mapped_error is not None:
raise mapped_error
return result

def _append_auth_header_metadata(self, client_call_details):
logger.debug(
Expand Down
55 changes: 54 additions & 1 deletion sdk/python/feast/permissions/server/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import grpc

from feast.errors import FeastObjectNotFoundException, to_error_detail
from feast.permissions.auth.auth_manager import (
get_auth_manager,
)
Expand Down Expand Up @@ -31,7 +32,7 @@ def grpc_interceptors(
if auth_type == AuthManagerType.NONE:
return None

return [AuthInterceptor()]
return [AuthInterceptor(), ErrorInterceptor()]
Comment thread
dmartinol marked this conversation as resolved.
Outdated


class AuthInterceptor(grpc.ServerInterceptor):
Expand All @@ -52,3 +53,55 @@ def intercept_service(self, continuation, handler_call_details):
sm.set_current_user(current_user)

return continuation(handler_call_details)


class ErrorInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
def exception_wrapper(behavior, request, context):
try:
return behavior(request, context)
except grpc.RpcError as e:
context.abort(e.code(), e.details())
except Exception as e:
context.abort(
_error_to_status_code(e),
to_error_detail(e),
)

handler = continuation(handler_call_details)
if handler is None:
return None

if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.unary_stream:
return grpc.unary_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_unary:
return grpc.stream_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_stream:
return grpc.stream_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler


def _error_to_status_code(error: Exception) -> grpc.StatusCode:
if isinstance(error, FeastObjectNotFoundException):
return grpc.StatusCode.NOT_FOUND
if isinstance(error, FeastObjectNotFoundException):
Comment thread
dmartinol marked this conversation as resolved.
Outdated
return grpc.StatusCode.PERMISSION_DENIED
return grpc.StatusCode.INTERNAL