@@ -53,11 +53,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5353 type : Literal ["dynamodb" ] = "dynamodb"
5454 """Online store type selector"""
5555
56- batch_size : int = 40
57- """Number of items to retrieve in a DynamoDB BatchGetItem call."""
56+ batch_size : int = 100
57+ """Number of items to retrieve in a DynamoDB BatchGetItem call.
58+ DynamoDB supports up to 100 items per BatchGetItem request."""
5859
5960 endpoint_url : Union [str , None ] = None
60- """DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
61+ """DynamoDB endpoint URL. Use for local development (e.g., http://localhost:8000)
62+ or VPC endpoints for improved latency."""
6163
6264 region : StrictStr
6365 """AWS Region Name"""
@@ -74,30 +76,33 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7476 session_based_auth : bool = False
7577 """AWS session based client authentication"""
7678
77- max_pool_connections : int = 10
78- """Max number of connections for async Dynamodb operations"""
79+ max_pool_connections : int = 50
80+ """Max number of connections for async Dynamodb operations.
81+ Increase for high-throughput workloads."""
7982
80- keepalive_timeout : float = 12.0
81- """Keep-alive timeout in seconds for async Dynamodb connections."""
83+ keepalive_timeout : float = 30.0
84+ """Keep-alive timeout in seconds for async Dynamodb connections.
85+ Higher values help reuse connections under sustained load."""
8286
83- connect_timeout : Union [int , float ] = 60
87+ connect_timeout : Union [int , float ] = 5
8488 """The time in seconds until a timeout exception is thrown when attempting to make
85- an async connection."""
89+ an async connection. Lower values enable faster failure detection. """
8690
87- read_timeout : Union [int , float ] = 60
91+ read_timeout : Union [int , float ] = 10
8892 """The time in seconds until a timeout exception is thrown when attempting to read
89- from an async connection."""
93+ from an async connection. Lower values enable faster failure detection. """
9094
91- total_max_retry_attempts : Union [int , None ] = None
95+ total_max_retry_attempts : Union [int , None ] = 3
9296 """Maximum number of total attempts that will be made on a single request.
9397
9498 Maps to `retries.total_max_attempts` in botocore.config.Config.
9599 """
96100
97- retry_mode : Union [Literal ["legacy" , "standard" , "adaptive" ], None ] = None
101+ retry_mode : Union [Literal ["legacy" , "standard" , "adaptive" ], None ] = "adaptive"
98102 """The type of retry mode (aio)botocore should use.
99103
100104 Maps to `retries.mode` in botocore.config.Config.
105+ 'adaptive' mode provides intelligent retry with client-side rate limiting.
101106 """
102107
103108
@@ -111,16 +116,22 @@ class DynamoDBOnlineStore(OnlineStore):
111116 _aioboto_session: Async boto session.
112117 _aioboto_client: Async boto client.
113118 _aioboto_context_stack: Async context stack.
119+ _type_deserializer: Cached TypeDeserializer instance for performance.
114120 """
115121
116122 _dynamodb_client = None
117123 _dynamodb_resource = None
124+ # Class-level cached TypeDeserializer to avoid per-request instantiation
125+ _type_deserializer : TypeDeserializer = None
118126
119127 def __init__ (self ):
120128 super ().__init__ ()
121129 self ._aioboto_session = None
122130 self ._aioboto_client = None
123131 self ._aioboto_context_stack = None
132+ # Initialize cached TypeDeserializer if not already done
133+ if DynamoDBOnlineStore ._type_deserializer is None :
134+ DynamoDBOnlineStore ._type_deserializer = TypeDeserializer ()
124135
125136 async def initialize (self , config : RepoConfig ):
126137 online_config = config .online_store
@@ -133,6 +144,7 @@ async def initialize(self, config: RepoConfig):
133144 online_config .read_timeout ,
134145 online_config .total_max_retry_attempts ,
135146 online_config .retry_mode ,
147+ online_config .endpoint_url ,
136148 )
137149
138150 async def close (self ):
@@ -153,6 +165,7 @@ async def _get_aiodynamodb_client(
153165 read_timeout : Union [int , float ],
154166 total_max_retry_attempts : Union [int , None ],
155167 retry_mode : Union [Literal ["legacy" , "standard" , "adaptive" ], None ],
168+ endpoint_url : Optional [str ] = None ,
156169 ):
157170 if self ._aioboto_client is None :
158171 logger .debug ("initializing the aiobotocore dynamodb client" )
@@ -163,16 +176,23 @@ async def _get_aiodynamodb_client(
163176 if retry_mode is not None :
164177 retries ["mode" ] = retry_mode
165178
166- client_context = self . _get_aioboto_session (). create_client (
167- "dynamodb" ,
168- region_name = region ,
169- config = AioConfig (
179+ # Build client kwargs, including endpoint_url for VPC endpoints or local testing
180+ client_kwargs : Dict [ str , Any ] = {
181+ " region_name" : region ,
182+ " config" : AioConfig (
170183 max_pool_connections = max_pool_connections ,
171184 connect_timeout = connect_timeout ,
172185 read_timeout = read_timeout ,
173186 retries = retries if retries else None ,
174187 connector_args = {"keepalive_timeout" : keepalive_timeout },
175188 ),
189+ }
190+ if endpoint_url :
191+ client_kwargs ["endpoint_url" ] = endpoint_url
192+
193+ client_context = self ._get_aioboto_session ().create_client (
194+ "dynamodb" ,
195+ ** client_kwargs ,
176196 )
177197 self ._aioboto_context_stack = contextlib .AsyncExitStack ()
178198 self ._aioboto_client = (
@@ -431,6 +451,7 @@ async def online_write_batch_async(
431451 online_config .read_timeout ,
432452 online_config .total_max_retry_attempts ,
433453 online_config .retry_mode ,
454+ online_config .endpoint_url ,
434455 )
435456 await dynamo_write_items_async (client , table_name , items )
436457
@@ -448,6 +469,7 @@ def online_read(
448469 config: The RepoConfig for the current FeatureStore.
449470 table: Feast FeatureView.
450471 entity_keys: a list of entity keys that should be read from the FeatureStore.
472+ requested_features: Optional list of feature names to retrieve.
451473 """
452474 online_config = config .online_store
453475 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
@@ -479,7 +501,9 @@ def online_read(
479501 RequestItems = batch_entity_ids ,
480502 )
481503 batch_result = self ._process_batch_get_response (
482- table_instance .name , response , entity_ids , batch
504+ table_instance .name ,
505+ response ,
506+ batch ,
483507 )
484508 result .extend (batch_result )
485509 return result
@@ -513,7 +537,8 @@ async def online_read_async(
513537 entity_ids_iter = iter (entity_ids )
514538 table_name = _get_table_name (online_config , config , table )
515539
516- deserialize = TypeDeserializer ().deserialize
540+ # Use cached TypeDeserializer for better performance
541+ deserialize = self ._type_deserializer .deserialize
517542
518543 def to_tbl_resp (raw_client_response ):
519544 return {
@@ -542,6 +567,7 @@ def to_tbl_resp(raw_client_response):
542567 online_config .read_timeout ,
543568 online_config .total_max_retry_attempts ,
544569 online_config .retry_mode ,
570+ online_config .endpoint_url ,
545571 )
546572 response_batches = await asyncio .gather (
547573 * [
@@ -557,7 +583,6 @@ def to_tbl_resp(raw_client_response):
557583 result_batch = self ._process_batch_get_response (
558584 table_name ,
559585 response ,
560- entity_ids ,
561586 batch ,
562587 to_tbl_response = to_tbl_resp ,
563588 )
@@ -589,26 +614,6 @@ def _get_dynamodb_resource(
589614 )
590615 return self ._dynamodb_resource
591616
592- def _sort_dynamodb_response (
593- self ,
594- responses : list ,
595- order : list ,
596- to_tbl_response : Callable = lambda raw_dict : raw_dict ,
597- ) -> Any :
598- """DynamoDB Batch Get Item doesn't return items in a particular order."""
599- # Assign an index to order
600- order_with_index = {value : idx for idx , value in enumerate (order )}
601- # Sort table responses by index
602- table_responses_ordered : Any = [
603- (order_with_index [tbl_res ["entity_id" ]], tbl_res )
604- for tbl_res in map (to_tbl_response , responses )
605- ]
606- table_responses_ordered = sorted (
607- table_responses_ordered , key = lambda tup : tup [0 ]
608- )
609- _ , table_responses_ordered = zip (* table_responses_ordered )
610- return table_responses_ordered
611-
612617 def _write_batch_non_duplicates (
613618 self ,
614619 table_instance ,
@@ -630,44 +635,106 @@ def _write_batch_non_duplicates(
630635 progress (1 )
631636
632637 def _process_batch_get_response (
633- self , table_name , response , entity_ids , batch , ** sort_kwargs
634- ):
635- response = response .get ("Responses" )
636- table_responses = response .get (table_name )
638+ self ,
639+ table_name : str ,
640+ response : Dict [str , Any ],
641+ batch : List [str ],
642+ to_tbl_response : Callable = lambda raw_dict : raw_dict ,
643+ ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
644+ """Process batch get response using O(1) dictionary lookup.
637645
638- batch_result = []
639- if table_responses :
640- table_responses = self ._sort_dynamodb_response (
641- table_responses , entity_ids , ** sort_kwargs
642- )
643- entity_idx = 0
644- for tbl_res in table_responses :
645- entity_id = tbl_res ["entity_id" ]
646- while entity_id != batch [entity_idx ]:
647- batch_result .append ((None , None ))
648- entity_idx += 1
649- res = {}
650- for feature_name , value_bin in tbl_res ["values" ].items ():
646+ DynamoDB BatchGetItem doesn't return items in a particular order,
647+ so we use a dictionary for O(1) lookup instead of O(n log n) sorting.
648+
649+ This method:
650+ - Uses dictionary lookup instead of sorting for response ordering
651+ - Pre-allocates the result list with None values
652+ - Minimizes object creation in the hot path
653+
654+ Args:
655+ table_name: Name of the DynamoDB table
656+ response: Raw response from DynamoDB batch_get_item
657+ batch: List of entity_ids in the order they should be returned
658+ to_tbl_response: Function to transform raw DynamoDB response items
659+ (used for async client responses that need deserialization)
660+
661+ Returns:
662+ List of (timestamp, features) tuples in the same order as batch
663+ """
664+ responses_data = response .get ("Responses" )
665+ if not responses_data :
666+ # No responses at all, return all None tuples
667+ return [(None , None )] * len (batch )
668+
669+ table_responses = responses_data .get (table_name )
670+ if not table_responses :
671+ # No responses for this table, return all None tuples
672+ return [(None , None )] * len (batch )
673+
674+ # Build a dictionary for O(1) lookup instead of O(n log n) sorting
675+ response_dict : Dict [str , Any ] = {
676+ tbl_res ["entity_id" ]: tbl_res
677+ for tbl_res in map (to_tbl_response , table_responses )
678+ }
679+
680+ # Pre-allocate result list with None tuples (faster than appending)
681+ batch_size = len (batch )
682+ result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = [
683+ (None , None )
684+ ] * batch_size
685+
686+ # Process each entity in batch order using O(1) dict lookup
687+ for idx , entity_id in enumerate (batch ):
688+ tbl_res = response_dict .get (entity_id )
689+ if tbl_res is not None :
690+ # Parse feature values
691+ features : Dict [str , ValueProto ] = {}
692+ values_data = tbl_res ["values" ]
693+ for feature_name , value_bin in values_data .items ():
651694 val = ValueProto ()
652695 val .ParseFromString (value_bin .value )
653- res [feature_name ] = val
654- batch_result .append ((datetime .fromisoformat (tbl_res ["event_ts" ]), res ))
655- entity_idx += 1
656- # Not all entities in a batch may have responses
657- # Pad with remaining values in batch that were not found
658- batch_size_nones = ((None , None ),) * (len (batch ) - len (batch_result ))
659- batch_result .extend (batch_size_nones )
660- return batch_result
696+ features [feature_name ] = val
697+
698+ # Parse timestamp and set result
699+ result [idx ] = (
700+ datetime .fromisoformat (tbl_res ["event_ts" ]),
701+ features ,
702+ )
703+
704+ return result
661705
662706 @staticmethod
663- def _to_entity_ids (config : RepoConfig , entity_keys : List [EntityKeyProto ]):
664- return [
665- compute_entity_id (
666- entity_key ,
667- entity_key_serialization_version = config .entity_key_serialization_version ,
668- )
669- for entity_key in entity_keys
670- ]
707+ def _to_entity_ids (
708+ config : RepoConfig , entity_keys : List [EntityKeyProto ]
709+ ) -> List [str ]:
710+ """Convert entity keys to entity IDs with caching for repeated entities.
711+
712+ This method caches entity_id computations within a single request to avoid
713+ redundant hashing for duplicate entity keys.
714+ """
715+ # Use a cache to avoid recomputing entity_ids for duplicate entity keys
716+ # The cache key is the serialized proto (which is deterministic)
717+ entity_id_cache : Dict [bytes , str ] = {}
718+ entity_ids : List [str ] = []
719+ serialization_version = config .entity_key_serialization_version
720+
721+ for entity_key in entity_keys :
722+ # Use serialized proto as cache key
723+ cache_key = entity_key .SerializeToString ()
724+
725+ if cache_key in entity_id_cache :
726+ # Cache hit - reuse computed entity_id
727+ entity_ids .append (entity_id_cache [cache_key ])
728+ else :
729+ # Cache miss - compute and store
730+ entity_id = compute_entity_id (
731+ entity_key ,
732+ entity_key_serialization_version = serialization_version ,
733+ )
734+ entity_id_cache [cache_key ] = entity_id
735+ entity_ids .append (entity_id )
736+
737+ return entity_ids
671738
672739 @staticmethod
673740 def _to_resource_batch_get_payload (online_config , table_name , batch ):
0 commit comments