Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Improve DynamoDB parallel reads: shared client, configurable wor…
…kers

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
  • Loading branch information
abhijeet-dhumal committed Mar 4, 2026
commit 342587d03c79081516eeeeb4e577fe7b176feec8
44 changes: 31 additions & 13 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
session_based_auth: bool = False
"""AWS session based client authentication"""

max_read_workers: int = 10
"""Maximum number of parallel threads for batch read operations.
Higher values improve throughput for large batch reads but increase resource usage."""

max_pool_connections: int = 50
"""Max number of connections for async Dynamodb operations.
Increase for high-throughput workloads."""
Expand Down Expand Up @@ -481,7 +485,6 @@ def online_read(
online_config.session_based_auth,
)
table_name = _get_table_name(online_config, config, table)
table_instance = dynamodb_resource.Table(table_name)

batch_size = online_config.batch_size
entity_ids = self._to_entity_ids(config, entity_keys)
Expand All @@ -501,34 +504,49 @@ def online_read(
# For single batch, no parallelization overhead needed
if len(batches) == 1:
batch_entity_ids = self._to_resource_batch_get_payload(
online_config, table_instance.name, batches[0]
online_config, table_name, batches[0]
)
response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
return self._process_batch_get_response(table_name, response, batches[0])

# Execute batch requests in parallel for multiple batches
# Note: boto3 resources are NOT thread-safe, so we create a new resource per thread
# Note: boto3 clients ARE thread-safe, so we can share a single client
# https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
dynamodb_client = self._get_dynamodb_client(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)

def fetch_batch(batch: List[str]) -> Dict[str, Any]:
thread_resource = _initialize_dynamodb_resource(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)
batch_entity_ids = self._to_resource_batch_get_payload(
batch_entity_ids = self._to_client_batch_get_payload(
online_config, table_name, batch
)
return thread_resource.batch_get_item(RequestItems=batch_entity_ids)
return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids)

# Use ThreadPoolExecutor for parallel I/O
# Cap at 10 workers to avoid excessive thread creation
max_workers = min(len(batches), 10)
max_workers = min(len(batches), online_config.max_read_workers)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
responses = list(executor.map(fetch_batch, batches))
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

# Process responses and merge results in order
# Client responses need deserialization (unlike resource responses)
if self._type_deserializer is None:
self._type_deserializer = TypeDeserializer()
deserialize = self._type_deserializer.deserialize

def to_tbl_resp(raw_client_response):
return {
"entity_id": deserialize(raw_client_response["entity_id"]),
"event_ts": deserialize(raw_client_response["event_ts"]),
"values": deserialize(raw_client_response["values"]),
}

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for batch, response in zip(batches, responses):
batch_result = self._process_batch_get_response(table_name, response, batch)
batch_result = self._process_batch_get_response(
table_name, response, batch, to_tbl_response=to_tbl_resp
)
result.extend(batch_result)

return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,15 @@ def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store):


@mock_dynamodb
def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
"""Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size.
def test_dynamodb_online_store_max_workers_capped_at_config(dynamodb_online_store):
"""Verify ThreadPoolExecutor max_workers uses max_read_workers config.

Bug: Old code used min(len(batches), batch_size) which fails with small batch_size.
Fix: New code uses min(len(batches), 10) to ensure proper parallelization.
Fix: New code uses min(len(batches), max_read_workers) for proper parallelization.

This test uses batch_size=5 with 15 batches to expose the bug:
- OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism)
- NEW (fixed): max_workers = min(15, 10) = 10 (correct cap)
- NEW (fixed): max_workers = min(15, 10) = 10 (uses max_read_workers default)
"""
# Use small batch_size to expose the bug
small_batch_config = RepoConfig(
Expand Down Expand Up @@ -998,13 +998,14 @@ def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):


@mock_dynamodb
def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
def test_dynamodb_online_store_thread_safety_uses_shared_client(
dynamodb_online_store,
):
"""Verify each thread creates its own boto3 resource for thread-safety.
"""Verify multi-batch reads use a shared thread-safe boto3 client.

boto3 resources are NOT thread-safe, so we must create a new resource
per thread when using ThreadPoolExecutor.
boto3 clients ARE thread-safe, so we share a single client across threads
for better performance (avoids creating new sessions per thread).
https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
"""
config = RepoConfig(
registry=REGISTRY,
Expand All @@ -1023,16 +1024,16 @@ def test_dynamodb_online_store_thread_safety_new_resource_per_thread(

entity_keys, features, *rest = zip(*data)

# Track resources created to verify thread-safety
resources_created = []
original_init = boto3.resource
# Track clients created to verify thread-safety via shared client
clients_created = []
original_client = boto3.client

def tracking_resource(*args, **kwargs):
resource = original_init(*args, **kwargs)
resources_created.append(id(resource))
return resource
def tracking_client(*args, **kwargs):
client = original_client(*args, **kwargs)
clients_created.append(id(client))
return client

with patch.object(boto3, "resource", side_effect=tracking_resource):
with patch.object(boto3, "client", side_effect=tracking_client):
returned_items = dynamodb_online_store.online_read(
config=config,
table=MockFeatureView(name=db_table_name),
Expand All @@ -1042,18 +1043,10 @@ def tracking_resource(*args, **kwargs):
# Verify results are correct (functional correctness)
assert len(returned_items) == n_samples

# Verify multiple resources were created (thread-safety)
# Each of the 3 batches should create its own resource
# (plus potentially 1 for _get_dynamodb_resource cache initialization)
assert len(resources_created) >= 3, (
f"Expected at least 3 unique resources for 3 batches, "
f"got {len(resources_created)}"
)

# Verify the resources are actually different objects (not reused)
# At least the batch resources should be unique
unique_resources = set(resources_created)
assert len(unique_resources) >= 3, (
f"Expected at least 3 unique resource IDs, "
f"got {len(unique_resources)} unique out of {len(resources_created)}"
# Verify only one client was created (shared across threads)
# The client is cached and reused for all batch requests
dynamodb_clients = [c for c in clients_created]
assert len(set(dynamodb_clients)) == 1, (
f"Expected 1 shared client for thread-safety, "
f"got {len(set(dynamodb_clients))} unique clients"
)