1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414import asyncio
15- import contextlib
1615import itertools
1716import logging
1817from datetime import datetime
1918from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple , Union
2019
21- from aiobotocore .config import AioConfig
2220from pydantic import StrictBool , StrictStr
2321
2422from feast import Entity , FeatureView , utils
2523from feast .infra .infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE , InfraObject
2624from feast .infra .online_stores .helpers import compute_entity_id
2725from feast .infra .online_stores .online_store import OnlineStore
26+ from feast .infra .supported_async_methods import SupportedAsyncMethods
2827from feast .protos .feast .core .DynamoDBTable_pb2 import (
2928 DynamoDBTable as DynamoDBTableProto ,
3029)
@@ -76,9 +75,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7675 session_based_auth : bool = False
7776 """AWS session based client authentication"""
7877
79- max_pool_connections : int = 10
80- """Max number of connections for async Dynamodb operations"""
81-
8278
8379class DynamoDBOnlineStore (OnlineStore ):
8480 """
@@ -91,14 +87,11 @@ class DynamoDBOnlineStore(OnlineStore):
9187
9288 _dynamodb_client = None
9389 _dynamodb_resource = None
90+ _aioboto_session = None
9491
95- async def initialize (self , config : RepoConfig ):
96- await _get_aiodynamodb_client (
97- config .online_store .region , config .online_store .max_pool_connections
98- )
99-
100- async def close (self ):
101- await _aiodynamodb_close ()
92+ @property
93+ def async_supported (self ) -> SupportedAsyncMethods :
94+ return SupportedAsyncMethods (read = True )
10295
10396 def update (
10497 self ,
@@ -333,17 +326,15 @@ def to_tbl_resp(raw_client_response):
333326 batches .append (batch )
334327 entity_id_batches .append (entity_id_batch )
335328
336- client = await _get_aiodynamodb_client (
337- online_config .region , online_config .max_pool_connections
338- )
339- response_batches = await asyncio .gather (
340- * [
341- client .batch_get_item (
342- RequestItems = entity_id_batch ,
343- )
344- for entity_id_batch in entity_id_batches
345- ]
346- )
329+ async with self ._get_aiodynamodb_client (online_config .region ) as client :
330+ response_batches = await asyncio .gather (
331+ * [
332+ client .batch_get_item (
333+ RequestItems = entity_id_batch ,
334+ )
335+ for entity_id_batch in entity_id_batches
336+ ]
337+ )
347338
348339 result_batches = []
349340 for batch , response in zip (batches , response_batches ):
@@ -358,6 +349,14 @@ def to_tbl_resp(raw_client_response):
358349
359350 return list (itertools .chain (* result_batches ))
360351
352+ def _get_aioboto_session (self ):
353+ if self ._aioboto_session is None :
354+ self ._aioboto_session = session .get_session ()
355+ return self ._aioboto_session
356+
357+ def _get_aiodynamodb_client (self , region : str ):
358+ return self ._get_aioboto_session ().create_client ("dynamodb" , region_name = region )
359+
361360 def _get_dynamodb_client (
362361 self ,
363362 region : str ,
@@ -490,38 +489,6 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
490489 }
491490
492491
493- _aioboto_session = None
494- _aioboto_client = None
495-
496-
497- def _get_aioboto_session ():
498- global _aioboto_session
499- if _aioboto_session is None :
500- logger .debug ("initializing the aiobotocore session" )
501- _aioboto_session = session .get_session ()
502- return _aioboto_session
503-
504-
505- async def _get_aiodynamodb_client (region : str , max_pool_connections : int ):
506- global _aioboto_client
507- if _aioboto_client is None :
508- logger .debug ("initializing the aiobotocore dynamodb client" )
509- client_context = _get_aioboto_session ().create_client (
510- "dynamodb" ,
511- region_name = region ,
512- config = AioConfig (max_pool_connections = max_pool_connections ),
513- )
514- context_stack = contextlib .AsyncExitStack ()
515- _aioboto_client = await context_stack .enter_async_context (client_context )
516- return _aioboto_client
517-
518-
519- async def _aiodynamodb_close ():
520- global _aioboto_client
521- if _aioboto_client :
522- await _aioboto_client .close ()
523-
524-
525492def _initialize_dynamodb_client (
526493 region : str ,
527494 endpoint_url : Optional [str ] = None ,
0 commit comments