Skip to content

Commit f68ebbf

Browse files
committed
add explicit async support tags, use in server
Signed-off-by: Rob Howley <howley.robert@gmail.com>
1 parent 3077696 commit f68ebbf

File tree

9 files changed

+68
-95
lines changed

9 files changed

+68
-95
lines changed

sdk/python/feast/feature_server.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import psutil
1111
from dateutil import parser
1212
from fastapi import Depends, FastAPI, Request, Response, status
13+
from fastapi.concurrency import run_in_threadpool
1314
from fastapi.logger import logger
1415
from fastapi.responses import JSONResponse
1516
from google.protobuf.json_format import MessageToDict
@@ -100,10 +101,8 @@ def async_refresh():
100101
@asynccontextmanager
101102
async def lifespan(app: FastAPI):
102103
async_refresh()
103-
await store.initialize()
104104
yield
105105
stop_refresh()
106-
await store.close()
107106

108107
app = FastAPI(lifespan=lifespan)
109108

@@ -114,7 +113,7 @@ async def get_body(request: Request):
114113
"/get-online-features",
115114
dependencies=[Depends(inject_user_details)],
116115
)
117-
def get_online_features(body=Depends(get_body)):
116+
async def get_online_features(body=Depends(get_body)):
118117
body = json.loads(body)
119118
full_feature_names = body.get("full_feature_names", False)
120119
entity_rows = body["entities"]
@@ -147,15 +146,22 @@ def get_online_features(body=Depends(get_body)):
147146
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
148147
)
149148

150-
response_proto = store.get_online_features(
149+
read_params = dict(
151150
features=features,
152151
entity_rows=entity_rows,
153152
full_feature_names=full_feature_names,
154-
).proto
153+
)
154+
155+
if store._get_provider().async_supported.online.read:
156+
response = await store.get_online_features_async(**read_params)
157+
else:
158+
response = await run_in_threadpool(
159+
lambda: store.get_online_features(**read_params)
160+
)
155161

156162
# Convert the Protobuf object to JSON and return it
157163
return MessageToDict(
158-
response_proto, preserving_proto_field_name=True, float_precision=18
164+
response.proto, preserving_proto_field_name=True, float_precision=18
159165
)
160166

161167
@app.post("/push", dependencies=[Depends(inject_user_details)])

sdk/python/feast/feature_store.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2078,14 +2078,6 @@ def list_saved_datasets(
20782078
self.project, allow_cache=allow_cache, tags=tags
20792079
)
20802080

2081-
async def initialize(self) -> None:
2082-
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
2083-
await self._get_provider().initialize(self.config)
2084-
2085-
async def close(self) -> None:
2086-
"""Cleanup any long-lived clients and/or resources"""
2087-
await self._get_provider().close()
2088-
20892081

20902082
def _print_materialization_log(
20912083
start_date, end_date, num_feature_views: int, online_store: str

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from feast.feature_logging import LoggingConfig, LoggingSource
3636
from feast.feature_view import FeatureView
3737
from feast.infra.registry.base_registry import BaseRegistry
38+
from feast.infra.supported_async_methods import SupportedAsyncMethods
3839
from feast.on_demand_feature_view import OnDemandFeatureView
3940
from feast.repo_config import RepoConfig
4041
from feast.saved_dataset import SavedDatasetStorage
@@ -223,6 +224,10 @@ class OfflineStore(ABC):
223224
the SnowflakeOfflineStore can handle SnowflakeSources but not FileSources.
224225
"""
225226

227+
@property
228+
def async_supported(self) -> SupportedAsyncMethods:
229+
return SupportedAsyncMethods()
230+
226231
@staticmethod
227232
def pull_latest_from_table_or_query(
228233
config: RepoConfig,

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import asyncio
15-
import contextlib
1615
import itertools
1716
import logging
1817
from datetime import datetime
1918
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
2019

21-
from aiobotocore.config import AioConfig
2220
from pydantic import StrictBool, StrictStr
2321

2422
from feast import Entity, FeatureView, utils
2523
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
2624
from feast.infra.online_stores.helpers import compute_entity_id
2725
from feast.infra.online_stores.online_store import OnlineStore
26+
from feast.infra.supported_async_methods import SupportedAsyncMethods
2827
from feast.protos.feast.core.DynamoDBTable_pb2 import (
2928
DynamoDBTable as DynamoDBTableProto,
3029
)
@@ -76,9 +75,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7675
session_based_auth: bool = False
7776
"""AWS session based client authentication"""
7877

79-
max_pool_connections: int = 10
80-
"""Max number of connections for async Dynamodb operations"""
81-
8278

8379
class DynamoDBOnlineStore(OnlineStore):
8480
"""
@@ -91,14 +87,11 @@ class DynamoDBOnlineStore(OnlineStore):
9187

9288
_dynamodb_client = None
9389
_dynamodb_resource = None
90+
_aioboto_session = None
9491

95-
async def initialize(self, config: RepoConfig):
96-
await _get_aiodynamodb_client(
97-
config.online_store.region, config.online_store.max_pool_connections
98-
)
99-
100-
async def close(self):
101-
await _aiodynamodb_close()
92+
@property
93+
def async_supported(self) -> SupportedAsyncMethods:
94+
return SupportedAsyncMethods(read=True)
10295

10396
def update(
10497
self,
@@ -333,17 +326,15 @@ def to_tbl_resp(raw_client_response):
333326
batches.append(batch)
334327
entity_id_batches.append(entity_id_batch)
335328

336-
client = await _get_aiodynamodb_client(
337-
online_config.region, online_config.max_pool_connections
338-
)
339-
response_batches = await asyncio.gather(
340-
*[
341-
client.batch_get_item(
342-
RequestItems=entity_id_batch,
343-
)
344-
for entity_id_batch in entity_id_batches
345-
]
346-
)
329+
async with self._get_aiodynamodb_client(online_config.region) as client:
330+
response_batches = await asyncio.gather(
331+
*[
332+
client.batch_get_item(
333+
RequestItems=entity_id_batch,
334+
)
335+
for entity_id_batch in entity_id_batches
336+
]
337+
)
347338

348339
result_batches = []
349340
for batch, response in zip(batches, response_batches):
@@ -358,6 +349,14 @@ def to_tbl_resp(raw_client_response):
358349

359350
return list(itertools.chain(*result_batches))
360351

352+
def _get_aioboto_session(self):
353+
if self._aioboto_session is None:
354+
self._aioboto_session = session.get_session()
355+
return self._aioboto_session
356+
357+
def _get_aiodynamodb_client(self, region: str):
358+
return self._get_aioboto_session().create_client("dynamodb", region_name=region)
359+
361360
def _get_dynamodb_client(
362361
self,
363362
region: str,
@@ -490,38 +489,6 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
490489
}
491490

492491

493-
_aioboto_session = None
494-
_aioboto_client = None
495-
496-
497-
def _get_aioboto_session():
498-
global _aioboto_session
499-
if _aioboto_session is None:
500-
logger.debug("initializing the aiobotocore session")
501-
_aioboto_session = session.get_session()
502-
return _aioboto_session
503-
504-
505-
async def _get_aiodynamodb_client(region: str, max_pool_connections: int):
506-
global _aioboto_client
507-
if _aioboto_client is None:
508-
logger.debug("initializing the aiobotocore dynamodb client")
509-
client_context = _get_aioboto_session().create_client(
510-
"dynamodb",
511-
region_name=region,
512-
config=AioConfig(max_pool_connections=max_pool_connections),
513-
)
514-
context_stack = contextlib.AsyncExitStack()
515-
_aioboto_client = await context_stack.enter_async_context(client_context)
516-
return _aioboto_client
517-
518-
519-
async def _aiodynamodb_close():
520-
global _aioboto_client
521-
if _aioboto_client:
522-
await _aioboto_client.close()
523-
524-
525492
def _initialize_dynamodb_client(
526493
region: str,
527494
endpoint_url: Optional[str] = None,

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from feast.feature_view import FeatureView
2323
from feast.infra.infra_object import InfraObject
2424
from feast.infra.registry.base_registry import BaseRegistry
25+
from feast.infra.supported_async_methods import SupportedAsyncMethods
2526
from feast.online_response import OnlineResponse
2627
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
2728
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -36,6 +37,10 @@ class OnlineStore(ABC):
3637
The interface that Feast uses to interact with the storage system that handles online features.
3738
"""
3839

40+
@property
41+
def async_supported(self) -> SupportedAsyncMethods:
42+
return SupportedAsyncMethods()
43+
3944
@abstractmethod
4045
def online_write_batch(
4146
self,
@@ -390,9 +395,3 @@ def retrieve_online_documents(
390395
raise NotImplementedError(
391396
f"Online store {self.__class__.__name__} does not support online retrieval"
392397
)
393-
394-
async def initialize(self, config: RepoConfig) -> None:
395-
pass
396-
397-
async def close(self) -> None:
398-
pass

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from feast.infra.online_stores.helpers import get_online_store_from_config
3636
from feast.infra.provider import Provider
3737
from feast.infra.registry.base_registry import BaseRegistry
38+
from feast.infra.supported_async_methods import ProviderAsyncMethods
3839
from feast.online_response import OnlineResponse
3940
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
4041
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -79,6 +80,13 @@ def offline_store(self):
7980
)
8081
return self._offline_store
8182

83+
@property
84+
def async_supported(self) -> ProviderAsyncMethods:
85+
return ProviderAsyncMethods(
86+
online=self.online_store.async_supported,
87+
offline=self.offline_store.async_supported,
88+
)
89+
8290
@property
8391
def batch_engine(self) -> BatchMaterializationEngine:
8492
if self._batch_engine:
@@ -473,9 +481,3 @@ def get_table_column_names_and_types_from_data_source(
473481
return self.offline_store.get_table_column_names_and_types_from_data_source(
474482
config=config, data_source=data_source
475483
)
476-
477-
async def initialize(self, config: RepoConfig) -> None:
478-
await self.online_store.initialize(config)
479-
480-
async def close(self) -> None:
481-
await self.online_store.close()

sdk/python/feast/infra/provider.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from feast.infra.infra_object import Infra
2828
from feast.infra.offline_stores.offline_store import RetrievalJob
2929
from feast.infra.registry.base_registry import BaseRegistry
30+
from feast.infra.supported_async_methods import ProviderAsyncMethods
3031
from feast.on_demand_feature_view import OnDemandFeatureView
3132
from feast.online_response import OnlineResponse
3233
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
@@ -55,6 +56,10 @@ class Provider(ABC):
5556
def __init__(self, config: RepoConfig):
5657
pass
5758

59+
@property
60+
def async_supported(self) -> ProviderAsyncMethods:
61+
return ProviderAsyncMethods()
62+
5863
@abstractmethod
5964
def update_infra(
6065
self,
@@ -429,14 +434,6 @@ def get_table_column_names_and_types_from_data_source(
429434
"""
430435
pass
431436

432-
@abstractmethod
433-
async def initialize(self, config: RepoConfig) -> None:
434-
pass
435-
436-
@abstractmethod
437-
async def close(self) -> None:
438-
pass
439-
440437

441438
def get_provider(config: RepoConfig) -> Provider:
442439
if "." not in config.provider:
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pydantic import BaseModel, Field
2+
3+
4+
class SupportedAsyncMethods(BaseModel):
5+
read: bool = Field(default=False)
6+
write: bool = Field(default=False)
7+
8+
9+
class ProviderAsyncMethods(BaseModel):
10+
online: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods)
11+
offline: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods)

sdk/python/tests/foo_provider.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,3 @@ async def get_online_features_async(
184184
full_feature_names: bool = False,
185185
) -> OnlineResponse:
186186
pass
187-
188-
async def initialize(self, config: RepoConfig) -> None:
189-
pass
190-
191-
async def close(self) -> None:
192-
pass

0 commit comments

Comments
 (0)