Skip to content

Commit bb4d75a

Browse files
Merge branch 'master' into ray_azure_integration
2 parents ba449a4 + fcc8274 commit bb4d75a

File tree

2 files changed

+120
-66
lines changed

2 files changed

+120
-66
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 112 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5353
type: Literal["dynamodb"] = "dynamodb"
5454
"""Online store type selector"""
5555

56-
batch_size: int = 40
57-
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
56+
batch_size: int = 100
57+
"""Number of items to retrieve in a DynamoDB BatchGetItem call.
58+
DynamoDB supports up to 100 items per BatchGetItem request."""
5859

5960
endpoint_url: Union[str, None] = None
60-
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
61+
"""DynamoDB endpoint URL. Use for local development (e.g., http://localhost:8000)
62+
or VPC endpoints for improved latency."""
6163

6264
region: StrictStr
6365
"""AWS Region Name"""
@@ -74,30 +76,33 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7476
session_based_auth: bool = False
7577
"""AWS session based client authentication"""
7678

77-
max_pool_connections: int = 10
78-
"""Max number of connections for async Dynamodb operations"""
79+
max_pool_connections: int = 50
80+
"""Max number of connections for async Dynamodb operations.
81+
Increase for high-throughput workloads."""
7982

80-
keepalive_timeout: float = 12.0
81-
"""Keep-alive timeout in seconds for async Dynamodb connections."""
83+
keepalive_timeout: float = 30.0
84+
"""Keep-alive timeout in seconds for async Dynamodb connections.
85+
Higher values help reuse connections under sustained load."""
8286

83-
connect_timeout: Union[int, float] = 60
87+
connect_timeout: Union[int, float] = 5
8488
"""The time in seconds until a timeout exception is thrown when attempting to make
85-
an async connection."""
89+
an async connection. Lower values enable faster failure detection."""
8690

87-
read_timeout: Union[int, float] = 60
91+
read_timeout: Union[int, float] = 10
8892
"""The time in seconds until a timeout exception is thrown when attempting to read
89-
from an async connection."""
93+
from an async connection. Lower values enable faster failure detection."""
9094

91-
total_max_retry_attempts: Union[int, None] = None
95+
total_max_retry_attempts: Union[int, None] = 3
9296
"""Maximum number of total attempts that will be made on a single request.
9397
9498
Maps to `retries.total_max_attempts` in botocore.config.Config.
9599
"""
96100

97-
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
101+
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = "adaptive"
98102
"""The type of retry mode (aio)botocore should use.
99103
100104
Maps to `retries.mode` in botocore.config.Config.
105+
'adaptive' mode provides intelligent retry with client-side rate limiting.
101106
"""
102107

103108

@@ -111,16 +116,22 @@ class DynamoDBOnlineStore(OnlineStore):
111116
_aioboto_session: Async boto session.
112117
_aioboto_client: Async boto client.
113118
_aioboto_context_stack: Async context stack.
119+
_type_deserializer: Cached TypeDeserializer instance for performance.
114120
"""
115121

116122
_dynamodb_client = None
117123
_dynamodb_resource = None
124+
# Class-level cached TypeDeserializer to avoid per-request instantiation
125+
_type_deserializer: Optional[TypeDeserializer] = None
118126

119127
def __init__(self):
120128
super().__init__()
121129
self._aioboto_session = None
122130
self._aioboto_client = None
123131
self._aioboto_context_stack = None
132+
# Initialize cached TypeDeserializer if not already done
133+
if DynamoDBOnlineStore._type_deserializer is None:
134+
DynamoDBOnlineStore._type_deserializer = TypeDeserializer()
124135

125136
async def initialize(self, config: RepoConfig):
126137
online_config = config.online_store
@@ -133,6 +144,7 @@ async def initialize(self, config: RepoConfig):
133144
online_config.read_timeout,
134145
online_config.total_max_retry_attempts,
135146
online_config.retry_mode,
147+
online_config.endpoint_url,
136148
)
137149

138150
async def close(self):
@@ -153,6 +165,7 @@ async def _get_aiodynamodb_client(
153165
read_timeout: Union[int, float],
154166
total_max_retry_attempts: Union[int, None],
155167
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
168+
endpoint_url: Optional[str] = None,
156169
):
157170
if self._aioboto_client is None:
158171
logger.debug("initializing the aiobotocore dynamodb client")
@@ -163,16 +176,23 @@ async def _get_aiodynamodb_client(
163176
if retry_mode is not None:
164177
retries["mode"] = retry_mode
165178

166-
client_context = self._get_aioboto_session().create_client(
167-
"dynamodb",
168-
region_name=region,
169-
config=AioConfig(
179+
# Build client kwargs, including endpoint_url for VPC endpoints or local testing
180+
client_kwargs: Dict[str, Any] = {
181+
"region_name": region,
182+
"config": AioConfig(
170183
max_pool_connections=max_pool_connections,
171184
connect_timeout=connect_timeout,
172185
read_timeout=read_timeout,
173186
retries=retries if retries else None,
174187
connector_args={"keepalive_timeout": keepalive_timeout},
175188
),
189+
}
190+
if endpoint_url:
191+
client_kwargs["endpoint_url"] = endpoint_url
192+
193+
client_context = self._get_aioboto_session().create_client(
194+
"dynamodb",
195+
**client_kwargs,
176196
)
177197
self._aioboto_context_stack = contextlib.AsyncExitStack()
178198
self._aioboto_client = (
@@ -431,6 +451,7 @@ async def online_write_batch_async(
431451
online_config.read_timeout,
432452
online_config.total_max_retry_attempts,
433453
online_config.retry_mode,
454+
online_config.endpoint_url,
434455
)
435456
await dynamo_write_items_async(client, table_name, items)
436457

@@ -448,6 +469,7 @@ def online_read(
448469
config: The RepoConfig for the current FeatureStore.
449470
table: Feast FeatureView.
450471
entity_keys: a list of entity keys that should be read from the FeatureStore.
472+
requested_features: Optional list of feature names to retrieve.
451473
"""
452474
online_config = config.online_store
453475
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
@@ -479,7 +501,9 @@ def online_read(
479501
RequestItems=batch_entity_ids,
480502
)
481503
batch_result = self._process_batch_get_response(
482-
table_instance.name, response, entity_ids, batch
504+
table_instance.name,
505+
response,
506+
batch,
483507
)
484508
result.extend(batch_result)
485509
return result
@@ -513,7 +537,10 @@ async def online_read_async(
513537
entity_ids_iter = iter(entity_ids)
514538
table_name = _get_table_name(online_config, config, table)
515539

516-
deserialize = TypeDeserializer().deserialize
540+
# Use cached TypeDeserializer for better performance
541+
if self._type_deserializer is None:
542+
self._type_deserializer = TypeDeserializer()
543+
deserialize = self._type_deserializer.deserialize
517544

518545
def to_tbl_resp(raw_client_response):
519546
return {
@@ -542,6 +569,7 @@ def to_tbl_resp(raw_client_response):
542569
online_config.read_timeout,
543570
online_config.total_max_retry_attempts,
544571
online_config.retry_mode,
572+
online_config.endpoint_url,
545573
)
546574
response_batches = await asyncio.gather(
547575
*[
@@ -557,7 +585,6 @@ def to_tbl_resp(raw_client_response):
557585
result_batch = self._process_batch_get_response(
558586
table_name,
559587
response,
560-
entity_ids,
561588
batch,
562589
to_tbl_response=to_tbl_resp,
563590
)
@@ -589,26 +616,6 @@ def _get_dynamodb_resource(
589616
)
590617
return self._dynamodb_resource
591618

592-
def _sort_dynamodb_response(
593-
self,
594-
responses: list,
595-
order: list,
596-
to_tbl_response: Callable = lambda raw_dict: raw_dict,
597-
) -> Any:
598-
"""DynamoDB Batch Get Item doesn't return items in a particular order."""
599-
# Assign an index to order
600-
order_with_index = {value: idx for idx, value in enumerate(order)}
601-
# Sort table responses by index
602-
table_responses_ordered: Any = [
603-
(order_with_index[tbl_res["entity_id"]], tbl_res)
604-
for tbl_res in map(to_tbl_response, responses)
605-
]
606-
table_responses_ordered = sorted(
607-
table_responses_ordered, key=lambda tup: tup[0]
608-
)
609-
_, table_responses_ordered = zip(*table_responses_ordered)
610-
return table_responses_ordered
611-
612619
def _write_batch_non_duplicates(
613620
self,
614621
table_instance,
@@ -630,37 +637,77 @@ def _write_batch_non_duplicates(
630637
progress(1)
631638

632639
def _process_batch_get_response(
633-
self, table_name, response, entity_ids, batch, **sort_kwargs
634-
):
635-
response = response.get("Responses")
636-
table_responses = response.get(table_name)
640+
self,
641+
table_name: str,
642+
response: Dict[str, Any],
643+
batch: List[str],
644+
to_tbl_response: Callable = lambda raw_dict: raw_dict,
645+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
646+
"""Process batch get response using O(1) dictionary lookup.
637647
638-
batch_result = []
639-
if table_responses:
640-
table_responses = self._sort_dynamodb_response(
641-
table_responses, entity_ids, **sort_kwargs
642-
)
643-
entity_idx = 0
644-
for tbl_res in table_responses:
645-
entity_id = tbl_res["entity_id"]
646-
while entity_id != batch[entity_idx]:
647-
batch_result.append((None, None))
648-
entity_idx += 1
649-
res = {}
650-
for feature_name, value_bin in tbl_res["values"].items():
648+
DynamoDB BatchGetItem doesn't return items in a particular order,
649+
so we use a dictionary for O(1) lookup instead of O(n log n) sorting.
650+
651+
This method:
652+
- Uses dictionary lookup instead of sorting for response ordering
653+
- Pre-allocates the result list with None values
654+
- Minimizes object creation in the hot path
655+
656+
Args:
657+
table_name: Name of the DynamoDB table
658+
response: Raw response from DynamoDB batch_get_item
659+
batch: List of entity_ids in the order they should be returned
660+
to_tbl_response: Function to transform raw DynamoDB response items
661+
(used for async client responses that need deserialization)
662+
663+
Returns:
664+
List of (timestamp, features) tuples in the same order as batch
665+
"""
666+
responses_data = response.get("Responses")
667+
if not responses_data:
668+
# No responses at all, return all None tuples
669+
return [(None, None)] * len(batch)
670+
671+
table_responses = responses_data.get(table_name)
672+
if not table_responses:
673+
# No responses for this table, return all None tuples
674+
return [(None, None)] * len(batch)
675+
676+
# Build a dictionary for O(1) lookup instead of O(n log n) sorting
677+
response_dict: Dict[str, Any] = {
678+
tbl_res["entity_id"]: tbl_res
679+
for tbl_res in map(to_tbl_response, table_responses)
680+
}
681+
682+
# Pre-allocate result list with None tuples (faster than appending)
683+
batch_size = len(batch)
684+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [
685+
(None, None)
686+
] * batch_size
687+
688+
# Process each entity in batch order using O(1) dict lookup
689+
for idx, entity_id in enumerate(batch):
690+
tbl_res = response_dict.get(entity_id)
691+
if tbl_res is not None:
692+
# Parse feature values
693+
features: Dict[str, ValueProto] = {}
694+
values_data = tbl_res["values"]
695+
for feature_name, value_bin in values_data.items():
651696
val = ValueProto()
652697
val.ParseFromString(value_bin.value)
653-
res[feature_name] = val
654-
batch_result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
655-
entity_idx += 1
656-
# Not all entities in a batch may have responses
657-
# Pad with remaining values in batch that were not found
658-
batch_size_nones = ((None, None),) * (len(batch) - len(batch_result))
659-
batch_result.extend(batch_size_nones)
660-
return batch_result
698+
features[feature_name] = val
699+
700+
# Parse timestamp and set result
701+
result[idx] = (
702+
datetime.fromisoformat(tbl_res["event_ts"]),
703+
features,
704+
)
705+
706+
return result
661707

662708
@staticmethod
663709
def _to_entity_ids(config: RepoConfig, entity_keys: List[EntityKeyProto]):
710+
"""Convert entity keys to entity IDs."""
664711
return [
665712
compute_entity_id(
666713
entity_key,

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,17 @@ def test_dynamodb_online_store_config_default():
6464
aws_region = "us-west-2"
6565
dynamodb_store_config = DynamoDBOnlineStoreConfig(region=aws_region)
6666
assert dynamodb_store_config.type == "dynamodb"
67-
assert dynamodb_store_config.batch_size == 40
67+
assert dynamodb_store_config.batch_size == 100
6868
assert dynamodb_store_config.endpoint_url is None
6969
assert dynamodb_store_config.region == aws_region
7070
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"
71+
# Verify other optimized defaults
72+
assert dynamodb_store_config.max_pool_connections == 50
73+
assert dynamodb_store_config.keepalive_timeout == 30.0
74+
assert dynamodb_store_config.connect_timeout == 5
75+
assert dynamodb_store_config.read_timeout == 10
76+
assert dynamodb_store_config.total_max_retry_attempts == 3
77+
assert dynamodb_store_config.retry_mode == "adaptive"
7178

7279

7380
def test_dynamodb_online_store_config_custom_params():

0 commit comments

Comments
 (0)