Skip to content

Commit 275eac2

Browse files
committed
feat: Optimize DynamoDB online store for improved latency
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent c1718b7 commit 275eac2

File tree

2 files changed

+148
-74
lines changed

2 files changed

+148
-74
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 140 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5353
type: Literal["dynamodb"] = "dynamodb"
5454
"""Online store type selector"""
5555

56-
batch_size: int = 40
57-
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
56+
batch_size: int = 100
57+
"""Number of items to retrieve in a DynamoDB BatchGetItem call.
58+
DynamoDB supports up to 100 items per BatchGetItem request."""
5859

5960
endpoint_url: Union[str, None] = None
60-
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
61+
"""DynamoDB endpoint URL. Use for local development (e.g., http://localhost:8000)
62+
or VPC endpoints for improved latency."""
6163

6264
region: StrictStr
6365
"""AWS Region Name"""
@@ -74,30 +76,33 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7476
session_based_auth: bool = False
7577
"""AWS session based client authentication"""
7678

77-
max_pool_connections: int = 10
78-
"""Max number of connections for async Dynamodb operations"""
79+
max_pool_connections: int = 50
80+
"""Max number of connections for async Dynamodb operations.
81+
Increase for high-throughput workloads."""
7982

80-
keepalive_timeout: float = 12.0
81-
"""Keep-alive timeout in seconds for async Dynamodb connections."""
83+
keepalive_timeout: float = 30.0
84+
"""Keep-alive timeout in seconds for async Dynamodb connections.
85+
Higher values help reuse connections under sustained load."""
8286

83-
connect_timeout: Union[int, float] = 60
87+
connect_timeout: Union[int, float] = 5
8488
"""The time in seconds until a timeout exception is thrown when attempting to make
85-
an async connection."""
89+
an async connection. Lower values enable faster failure detection."""
8690

87-
read_timeout: Union[int, float] = 60
91+
read_timeout: Union[int, float] = 10
8892
"""The time in seconds until a timeout exception is thrown when attempting to read
89-
from an async connection."""
93+
from an async connection. Lower values enable faster failure detection."""
9094

91-
total_max_retry_attempts: Union[int, None] = None
95+
total_max_retry_attempts: Union[int, None] = 3
9296
"""Maximum number of total attempts that will be made on a single request.
9397
9498
Maps to `retries.total_max_attempts` in botocore.config.Config.
9599
"""
96100

97-
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
101+
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = "adaptive"
98102
"""The type of retry mode (aio)botocore should use.
99103
100104
Maps to `retries.mode` in botocore.config.Config.
105+
'adaptive' mode provides intelligent retry with client-side rate limiting.
101106
"""
102107

103108

@@ -111,16 +116,22 @@ class DynamoDBOnlineStore(OnlineStore):
111116
_aioboto_session: Async boto session.
112117
_aioboto_client: Async boto client.
113118
_aioboto_context_stack: Async context stack.
119+
_type_deserializer: Cached TypeDeserializer instance for performance.
114120
"""
115121

116122
_dynamodb_client = None
117123
_dynamodb_resource = None
124+
# Class-level cached TypeDeserializer to avoid per-request instantiation
125+
_type_deserializer: TypeDeserializer = None
118126

119127
def __init__(self):
120128
super().__init__()
121129
self._aioboto_session = None
122130
self._aioboto_client = None
123131
self._aioboto_context_stack = None
132+
# Initialize cached TypeDeserializer if not already done
133+
if DynamoDBOnlineStore._type_deserializer is None:
134+
DynamoDBOnlineStore._type_deserializer = TypeDeserializer()
124135

125136
async def initialize(self, config: RepoConfig):
126137
online_config = config.online_store
@@ -133,6 +144,7 @@ async def initialize(self, config: RepoConfig):
133144
online_config.read_timeout,
134145
online_config.total_max_retry_attempts,
135146
online_config.retry_mode,
147+
online_config.endpoint_url,
136148
)
137149

138150
async def close(self):
@@ -153,6 +165,7 @@ async def _get_aiodynamodb_client(
153165
read_timeout: Union[int, float],
154166
total_max_retry_attempts: Union[int, None],
155167
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
168+
endpoint_url: Optional[str] = None,
156169
):
157170
if self._aioboto_client is None:
158171
logger.debug("initializing the aiobotocore dynamodb client")
@@ -163,16 +176,23 @@ async def _get_aiodynamodb_client(
163176
if retry_mode is not None:
164177
retries["mode"] = retry_mode
165178

166-
client_context = self._get_aioboto_session().create_client(
167-
"dynamodb",
168-
region_name=region,
169-
config=AioConfig(
179+
# Build client kwargs, including endpoint_url for VPC endpoints or local testing
180+
client_kwargs: Dict[str, Any] = {
181+
"region_name": region,
182+
"config": AioConfig(
170183
max_pool_connections=max_pool_connections,
171184
connect_timeout=connect_timeout,
172185
read_timeout=read_timeout,
173186
retries=retries if retries else None,
174187
connector_args={"keepalive_timeout": keepalive_timeout},
175188
),
189+
}
190+
if endpoint_url:
191+
client_kwargs["endpoint_url"] = endpoint_url
192+
193+
client_context = self._get_aioboto_session().create_client(
194+
"dynamodb",
195+
**client_kwargs,
176196
)
177197
self._aioboto_context_stack = contextlib.AsyncExitStack()
178198
self._aioboto_client = (
@@ -431,6 +451,7 @@ async def online_write_batch_async(
431451
online_config.read_timeout,
432452
online_config.total_max_retry_attempts,
433453
online_config.retry_mode,
454+
online_config.endpoint_url,
434455
)
435456
await dynamo_write_items_async(client, table_name, items)
436457

@@ -448,6 +469,7 @@ def online_read(
448469
config: The RepoConfig for the current FeatureStore.
449470
table: Feast FeatureView.
450471
entity_keys: a list of entity keys that should be read from the FeatureStore.
472+
requested_features: Optional list of feature names to retrieve.
451473
"""
452474
online_config = config.online_store
453475
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
@@ -479,7 +501,9 @@ def online_read(
479501
RequestItems=batch_entity_ids,
480502
)
481503
batch_result = self._process_batch_get_response(
482-
table_instance.name, response, entity_ids, batch
504+
table_instance.name,
505+
response,
506+
batch,
483507
)
484508
result.extend(batch_result)
485509
return result
@@ -513,7 +537,8 @@ async def online_read_async(
513537
entity_ids_iter = iter(entity_ids)
514538
table_name = _get_table_name(online_config, config, table)
515539

516-
deserialize = TypeDeserializer().deserialize
540+
# Use cached TypeDeserializer for better performance
541+
deserialize = self._type_deserializer.deserialize
517542

518543
def to_tbl_resp(raw_client_response):
519544
return {
@@ -542,6 +567,7 @@ def to_tbl_resp(raw_client_response):
542567
online_config.read_timeout,
543568
online_config.total_max_retry_attempts,
544569
online_config.retry_mode,
570+
online_config.endpoint_url,
545571
)
546572
response_batches = await asyncio.gather(
547573
*[
@@ -557,7 +583,6 @@ def to_tbl_resp(raw_client_response):
557583
result_batch = self._process_batch_get_response(
558584
table_name,
559585
response,
560-
entity_ids,
561586
batch,
562587
to_tbl_response=to_tbl_resp,
563588
)
@@ -589,26 +614,6 @@ def _get_dynamodb_resource(
589614
)
590615
return self._dynamodb_resource
591616

592-
def _sort_dynamodb_response(
593-
self,
594-
responses: list,
595-
order: list,
596-
to_tbl_response: Callable = lambda raw_dict: raw_dict,
597-
) -> Any:
598-
"""DynamoDB Batch Get Item doesn't return items in a particular order."""
599-
# Assign an index to order
600-
order_with_index = {value: idx for idx, value in enumerate(order)}
601-
# Sort table responses by index
602-
table_responses_ordered: Any = [
603-
(order_with_index[tbl_res["entity_id"]], tbl_res)
604-
for tbl_res in map(to_tbl_response, responses)
605-
]
606-
table_responses_ordered = sorted(
607-
table_responses_ordered, key=lambda tup: tup[0]
608-
)
609-
_, table_responses_ordered = zip(*table_responses_ordered)
610-
return table_responses_ordered
611-
612617
def _write_batch_non_duplicates(
613618
self,
614619
table_instance,
@@ -630,44 +635,106 @@ def _write_batch_non_duplicates(
630635
progress(1)
631636

632637
def _process_batch_get_response(
633-
self, table_name, response, entity_ids, batch, **sort_kwargs
634-
):
635-
response = response.get("Responses")
636-
table_responses = response.get(table_name)
638+
self,
639+
table_name: str,
640+
response: Dict[str, Any],
641+
batch: List[str],
642+
to_tbl_response: Callable = lambda raw_dict: raw_dict,
643+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
644+
"""Process batch get response using O(1) dictionary lookup.
637645
638-
batch_result = []
639-
if table_responses:
640-
table_responses = self._sort_dynamodb_response(
641-
table_responses, entity_ids, **sort_kwargs
642-
)
643-
entity_idx = 0
644-
for tbl_res in table_responses:
645-
entity_id = tbl_res["entity_id"]
646-
while entity_id != batch[entity_idx]:
647-
batch_result.append((None, None))
648-
entity_idx += 1
649-
res = {}
650-
for feature_name, value_bin in tbl_res["values"].items():
646+
DynamoDB BatchGetItem doesn't return items in a particular order,
647+
so we use a dictionary for O(1) lookup instead of O(n log n) sorting.
648+
649+
This method:
650+
- Uses dictionary lookup instead of sorting for response ordering
651+
- Pre-allocates the result list with None values
652+
- Minimizes object creation in the hot path
653+
654+
Args:
655+
table_name: Name of the DynamoDB table
656+
response: Raw response from DynamoDB batch_get_item
657+
batch: List of entity_ids in the order they should be returned
658+
to_tbl_response: Function to transform raw DynamoDB response items
659+
(used for async client responses that need deserialization)
660+
661+
Returns:
662+
List of (timestamp, features) tuples in the same order as batch
663+
"""
664+
responses_data = response.get("Responses")
665+
if not responses_data:
666+
# No responses at all, return all None tuples
667+
return [(None, None)] * len(batch)
668+
669+
table_responses = responses_data.get(table_name)
670+
if not table_responses:
671+
# No responses for this table, return all None tuples
672+
return [(None, None)] * len(batch)
673+
674+
# Build a dictionary for O(1) lookup instead of O(n log n) sorting
675+
response_dict: Dict[str, Any] = {
676+
tbl_res["entity_id"]: tbl_res
677+
for tbl_res in map(to_tbl_response, table_responses)
678+
}
679+
680+
# Pre-allocate result list with None tuples (faster than appending)
681+
batch_size = len(batch)
682+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [
683+
(None, None)
684+
] * batch_size
685+
686+
# Process each entity in batch order using O(1) dict lookup
687+
for idx, entity_id in enumerate(batch):
688+
tbl_res = response_dict.get(entity_id)
689+
if tbl_res is not None:
690+
# Parse feature values
691+
features: Dict[str, ValueProto] = {}
692+
values_data = tbl_res["values"]
693+
for feature_name, value_bin in values_data.items():
651694
val = ValueProto()
652695
val.ParseFromString(value_bin.value)
653-
res[feature_name] = val
654-
batch_result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
655-
entity_idx += 1
656-
# Not all entities in a batch may have responses
657-
# Pad with remaining values in batch that were not found
658-
batch_size_nones = ((None, None),) * (len(batch) - len(batch_result))
659-
batch_result.extend(batch_size_nones)
660-
return batch_result
696+
features[feature_name] = val
697+
698+
# Parse timestamp and set result
699+
result[idx] = (
700+
datetime.fromisoformat(tbl_res["event_ts"]),
701+
features,
702+
)
703+
704+
return result
661705

662706
@staticmethod
663-
def _to_entity_ids(config: RepoConfig, entity_keys: List[EntityKeyProto]):
664-
return [
665-
compute_entity_id(
666-
entity_key,
667-
entity_key_serialization_version=config.entity_key_serialization_version,
668-
)
669-
for entity_key in entity_keys
670-
]
707+
def _to_entity_ids(
708+
config: RepoConfig, entity_keys: List[EntityKeyProto]
709+
) -> List[str]:
710+
"""Convert entity keys to entity IDs with caching for repeated entities.
711+
712+
This method caches entity_id computations within a single request to avoid
713+
redundant hashing for duplicate entity keys.
714+
"""
715+
# Use a cache to avoid recomputing entity_ids for duplicate entity keys
716+
# The cache key is the serialized proto (which is deterministic)
717+
entity_id_cache: Dict[bytes, str] = {}
718+
entity_ids: List[str] = []
719+
serialization_version = config.entity_key_serialization_version
720+
721+
for entity_key in entity_keys:
722+
# Use serialized proto as cache key
723+
cache_key = entity_key.SerializeToString()
724+
725+
if cache_key in entity_id_cache:
726+
# Cache hit - reuse computed entity_id
727+
entity_ids.append(entity_id_cache[cache_key])
728+
else:
729+
# Cache miss - compute and store
730+
entity_id = compute_entity_id(
731+
entity_key,
732+
entity_key_serialization_version=serialization_version,
733+
)
734+
entity_id_cache[cache_key] = entity_id
735+
entity_ids.append(entity_id)
736+
737+
return entity_ids
671738

672739
@staticmethod
673740
def _to_resource_batch_get_payload(online_config, table_name, batch):

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,17 @@ def test_dynamodb_online_store_config_default():
6464
aws_region = "us-west-2"
6565
dynamodb_store_config = DynamoDBOnlineStoreConfig(region=aws_region)
6666
assert dynamodb_store_config.type == "dynamodb"
67-
assert dynamodb_store_config.batch_size == 40
67+
assert dynamodb_store_config.batch_size == 100
6868
assert dynamodb_store_config.endpoint_url is None
6969
assert dynamodb_store_config.region == aws_region
7070
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"
71+
# Verify other optimized defaults
72+
assert dynamodb_store_config.max_pool_connections == 50
73+
assert dynamodb_store_config.keepalive_timeout == 30.0
74+
assert dynamodb_store_config.connect_timeout == 5
75+
assert dynamodb_store_config.read_timeout == 10
76+
assert dynamodb_store_config.total_max_retry_attempts == 3
77+
assert dynamodb_store_config.retry_mode == "adaptive"
7178

7279

7380
def test_dynamodb_online_store_config_custom_params():

0 commit comments

Comments
 (0)