diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index c188bd0d7b4..ab88eca89fc 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -10,6 +10,7 @@ import psutil from dateutil import parser from fastapi import Depends, FastAPI, Request, Response, status +from fastapi.concurrency import run_in_threadpool from fastapi.logger import logger from fastapi.responses import JSONResponse from google.protobuf.json_format import MessageToDict @@ -112,7 +113,7 @@ async def get_body(request: Request): "/get-online-features", dependencies=[Depends(inject_user_details)], ) - def get_online_features(body=Depends(get_body)): + async def get_online_features(body=Depends(get_body)): body = json.loads(body) full_feature_names = body.get("full_feature_names", False) entity_rows = body["entities"] @@ -145,15 +146,22 @@ def get_online_features(body=Depends(get_body)): resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE] ) - response_proto = store.get_online_features( + read_params = dict( features=features, entity_rows=entity_rows, full_feature_names=full_feature_names, - ).proto + ) + + if store._get_provider().async_supported.online.read: + response = await store.get_online_features_async(**read_params) + else: + response = await run_in_threadpool( + lambda: store.get_online_features(**read_params) + ) # Convert the Protobuf object to JSON and return it return MessageToDict( - response_proto, preserving_proto_field_name=True, float_precision=18 + response.proto, preserving_proto_field_name=True, float_precision=18 ) @app.post("/push", dependencies=[Depends(inject_user_details)]) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index a6cbfb41d2b..a915d2ee34b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -23,6 +23,7 @@ from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.supported_async_methods import SupportedAsyncMethods from feast.protos.feast.core.DynamoDBTable_pb2 import ( DynamoDBTable as DynamoDBTableProto, ) @@ -88,6 +89,10 @@ class DynamoDBOnlineStore(OnlineStore): _dynamodb_resource = None _aioboto_session = None + @property + def async_supported(self) -> SupportedAsyncMethods: + return SupportedAsyncMethods(read=True) + def update( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index cf2d68eb746..be2065040b6 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -22,6 +22,7 @@ from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.supported_async_methods import SupportedAsyncMethods from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -36,6 +37,10 @@ class OnlineStore(ABC): The interface that Feast uses to interact with the storage system that handles online features. """ + @property + def async_supported(self) -> SupportedAsyncMethods: + return SupportedAsyncMethods() + @abstractmethod def online_write_batch( self, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 5acfc0d6f34..ea75cf5ff20 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -35,6 +35,7 @@ from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import Provider from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.supported_async_methods import ProviderAsyncMethods from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -79,6 +80,12 @@ def offline_store(self): ) return self._offline_store + @property + def async_supported(self) -> ProviderAsyncMethods: + return ProviderAsyncMethods( + online=self.online_store.async_supported, + ) + @property def batch_engine(self) -> BatchMaterializationEngine: if self._batch_engine: diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 0723e0513fd..fb483d194ed 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -27,6 +27,7 @@ from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.supported_async_methods import ProviderAsyncMethods from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -55,6 +56,10 @@ class Provider(ABC): def __init__(self, config: RepoConfig): pass + @property + def async_supported(self) -> ProviderAsyncMethods: + return ProviderAsyncMethods() + @abstractmethod def update_infra( self, diff --git a/sdk/python/feast/infra/supported_async_methods.py b/sdk/python/feast/infra/supported_async_methods.py new file mode 100644 index 00000000000..b675aa70406 --- /dev/null +++ b/sdk/python/feast/infra/supported_async_methods.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel, Field + + +class SupportedAsyncMethods(BaseModel): + read: bool = Field(default=False) + write: bool = Field(default=False) + + +class ProviderAsyncMethods(BaseModel): + online: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods)