Skip to content

Commit f4eebb4

Browse files
abhijeet-dhumalntkathole
authored andcommitted
fix: Improve DynamoDB parallel reads: shared client, configurable workers
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent 9f2f142 commit f4eebb4

File tree

2 files changed

+54
-43
lines changed

2 files changed

+54
-43
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7777
session_based_auth: bool = False
7878
"""AWS session based client authentication"""
7979

80+
max_read_workers: int = 10
81+
"""Maximum number of parallel threads for batch read operations.
82+
Higher values improve throughput for large batch reads but increase resource usage."""
83+
8084
max_pool_connections: int = 50
8185
"""Max number of connections for async Dynamodb operations.
8286
Increase for high-throughput workloads."""
@@ -481,7 +485,6 @@ def online_read(
481485
online_config.session_based_auth,
482486
)
483487
table_name = _get_table_name(online_config, config, table)
484-
table_instance = dynamodb_resource.Table(table_name)
485488

486489
batch_size = online_config.batch_size
487490
entity_ids = self._to_entity_ids(config, entity_keys)
@@ -501,34 +504,49 @@ def online_read(
501504
# For single batch, no parallelization overhead needed
502505
if len(batches) == 1:
503506
batch_entity_ids = self._to_resource_batch_get_payload(
504-
online_config, table_instance.name, batches[0]
507+
online_config, table_name, batches[0]
505508
)
506509
response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
507510
return self._process_batch_get_response(table_name, response, batches[0])
508511

509512
# Execute batch requests in parallel for multiple batches
510-
# Note: boto3 resources are NOT thread-safe, so we create a new resource per thread
513+
# Note: boto3 clients ARE thread-safe, so we can share a single client
514+
# https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
515+
dynamodb_client = self._get_dynamodb_client(
516+
online_config.region,
517+
online_config.endpoint_url,
518+
online_config.session_based_auth,
519+
)
520+
511521
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
512-
thread_resource = _initialize_dynamodb_resource(
513-
online_config.region,
514-
online_config.endpoint_url,
515-
online_config.session_based_auth,
516-
)
517-
batch_entity_ids = self._to_resource_batch_get_payload(
522+
batch_entity_ids = self._to_client_batch_get_payload(
518523
online_config, table_name, batch
519524
)
520-
return thread_resource.batch_get_item(RequestItems=batch_entity_ids)
525+
return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids)
521526

522527
# Use ThreadPoolExecutor for parallel I/O
523-
# Cap at 10 workers to avoid excessive thread creation
524-
max_workers = min(len(batches), 10)
528+
max_workers = min(len(batches), online_config.max_read_workers)
525529
with ThreadPoolExecutor(max_workers=max_workers) as executor:
526530
responses = list(executor.map(fetch_batch, batches))
527531

528532
# Process responses and merge results in order
533+
# Client responses need deserialization (unlike resource responses)
534+
if self._type_deserializer is None:
535+
self._type_deserializer = TypeDeserializer()
536+
deserialize = self._type_deserializer.deserialize
537+
538+
def to_tbl_resp(raw_client_response):
539+
return {
540+
"entity_id": deserialize(raw_client_response["entity_id"]),
541+
"event_ts": deserialize(raw_client_response["event_ts"]),
542+
"values": deserialize(raw_client_response["values"]),
543+
}
544+
529545
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
530546
for batch, response in zip(batches, responses):
531-
batch_result = self._process_batch_get_response(table_name, response, batch)
547+
batch_result = self._process_batch_get_response(
548+
table_name, response, batch, to_tbl_response=to_tbl_resp
549+
)
532550
result.extend(batch_result)
533551

534552
return result

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -946,15 +946,15 @@ def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store):
946946

947947

948948
@mock_dynamodb
949-
def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
950-
"""Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size.
949+
def test_dynamodb_online_store_max_workers_capped_at_config(dynamodb_online_store):
950+
"""Verify ThreadPoolExecutor max_workers uses max_read_workers config.
951951
952952
Bug: Old code used min(len(batches), batch_size) which fails with small batch_size.
953-
Fix: New code uses min(len(batches), 10) to ensure proper parallelization.
953+
Fix: New code uses min(len(batches), max_read_workers) for proper parallelization.
954954
955955
This test uses batch_size=5 with 15 batches to expose the bug:
956956
- OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism)
957-
- NEW (fixed): max_workers = min(15, 10) = 10 (correct cap)
957+
- NEW (fixed): max_workers = min(15, 10) = 10 (uses max_read_workers default)
958958
"""
959959
# Use small batch_size to expose the bug
960960
small_batch_config = RepoConfig(
@@ -998,13 +998,14 @@ def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
998998

999999

10001000
@mock_dynamodb
1001-
def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
1001+
def test_dynamodb_online_store_thread_safety_uses_shared_client(
10021002
dynamodb_online_store,
10031003
):
1004-
"""Verify each thread creates its own boto3 resource for thread-safety.
1004+
"""Verify multi-batch reads use a shared thread-safe boto3 client.
10051005
1006-
boto3 resources are NOT thread-safe, so we must create a new resource
1007-
per thread when using ThreadPoolExecutor.
1006+
boto3 clients ARE thread-safe, so we share a single client across threads
1007+
for better performance (avoids creating new sessions per thread).
1008+
https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
10081009
"""
10091010
config = RepoConfig(
10101011
registry=REGISTRY,
@@ -1023,16 +1024,16 @@ def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
10231024

10241025
entity_keys, features, *rest = zip(*data)
10251026

1026-
# Track resources created to verify thread-safety
1027-
resources_created = []
1028-
original_init = boto3.resource
1027+
# Track clients created to verify thread-safety via shared client
1028+
clients_created = []
1029+
original_client = boto3.client
10291030

1030-
def tracking_resource(*args, **kwargs):
1031-
resource = original_init(*args, **kwargs)
1032-
resources_created.append(id(resource))
1033-
return resource
1031+
def tracking_client(*args, **kwargs):
1032+
client = original_client(*args, **kwargs)
1033+
clients_created.append(id(client))
1034+
return client
10341035

1035-
with patch.object(boto3, "resource", side_effect=tracking_resource):
1036+
with patch.object(boto3, "client", side_effect=tracking_client):
10361037
returned_items = dynamodb_online_store.online_read(
10371038
config=config,
10381039
table=MockFeatureView(name=db_table_name),
@@ -1042,18 +1043,10 @@ def tracking_resource(*args, **kwargs):
10421043
# Verify results are correct (functional correctness)
10431044
assert len(returned_items) == n_samples
10441045

1045-
# Verify multiple resources were created (thread-safety)
1046-
# Each of the 3 batches should create its own resource
1047-
# (plus potentially 1 for _get_dynamodb_resource cache initialization)
1048-
assert len(resources_created) >= 3, (
1049-
f"Expected at least 3 unique resources for 3 batches, "
1050-
f"got {len(resources_created)}"
1051-
)
1052-
1053-
# Verify the resources are actually different objects (not reused)
1054-
# At least the batch resources should be unique
1055-
unique_resources = set(resources_created)
1056-
assert len(unique_resources) >= 3, (
1057-
f"Expected at least 3 unique resource IDs, "
1058-
f"got {len(unique_resources)} unique out of {len(resources_created)}"
1046+
# Verify only one client was created (shared across threads)
1047+
# The client is cached and reused for all batch requests
1048+
dynamodb_clients = [c for c in clients_created]
1049+
assert len(set(dynamodb_clients)) == 1, (
1050+
f"Expected 1 shared client for thread-safety, "
1051+
f"got {len(set(dynamodb_clients))} unique clients"
10591052
)

0 commit comments

Comments
 (0)