Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: address thread-safety and max_workers issues in parallel DynamoD…
…B reads

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
  • Loading branch information
abhijeet-dhumal committed Mar 4, 2026
commit e4a52027275371666a7ef43173404ac947dd74c0
13 changes: 10 additions & 3 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,21 @@ def online_read(
return self._process_batch_get_response(table_name, response, batches[0])

# Execute batch requests in parallel for multiple batches
# Note: boto3 resources are NOT thread-safe, so we create a new resource per thread
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
thread_resource = _initialize_dynamodb_resource(
Comment thread
ntkathole marked this conversation as resolved.
Outdated
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)
batch_entity_ids = self._to_resource_batch_get_payload(
online_config, table_instance.name, batch
online_config, table_name, batch
)
return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
return thread_resource.batch_get_item(RequestItems=batch_entity_ids)

# Use ThreadPoolExecutor for parallel I/O
max_workers = min(len(batches), batch_size)
# Cap at 10 workers to avoid excessive thread creation
max_workers = min(len(batches), 10)
Comment thread
ntkathole marked this conversation as resolved.
Outdated
with ThreadPoolExecutor(max_workers=max_workers) as executor:
responses = list(executor.map(fetch_batch, batches))
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,3 +879,181 @@ def test_dynamodb_online_store_online_read_order_preservation_across_batches(
# Verify exact order matches
for i, (returned, expected) in enumerate(zip(returned_items, features)):
assert returned[1] == expected, f"Mismatch at index {i}"


@mock_dynamodb
def test_dynamodb_online_store_online_read_small_batch_size(dynamodb_online_store):
"""Test parallel reads with small batch_size.

Verifies correctness with small batch sizes that create multiple batches.
"""
small_batch_config = RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5),
offline_store=DaskOfflineStoreConfig(),
entity_key_serialization_version=3,
)

n_samples = 25 # 5 batches with batch_size=5
db_table_name = f"{TABLE_NAME}_small_batch"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)
returned_items = dynamodb_online_store.online_read(
config=small_batch_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

assert len(returned_items) == n_samples
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb
def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store):
"""Test parallel reads with many batches (>10).

Verifies correctness when number of batches exceeds max_workers cap.
"""
many_batch_config = RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=10),
offline_store=DaskOfflineStoreConfig(),
entity_key_serialization_version=3,
)

n_samples = 150 # 15 batches with batch_size=10
db_table_name = f"{TABLE_NAME}_many_batches"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)
returned_items = dynamodb_online_store.online_read(
config=many_batch_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

assert len(returned_items) == n_samples
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb
def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
"""Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size.

Bug: Old code used min(len(batches), batch_size) which fails with small batch_size.
Fix: New code uses min(len(batches), 10) to ensure proper parallelization.

This test uses batch_size=5 with 15 batches to expose the bug:
- OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism)
- NEW (fixed): max_workers = min(15, 10) = 10 (correct cap)
"""
# Use small batch_size to expose the bug
small_batch_config = RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5),
offline_store=DaskOfflineStoreConfig(),
entity_key_serialization_version=3,
)

n_samples = 75 # 15 batches with batch_size=5
db_table_name = f"{TABLE_NAME}_max_workers_cap"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)

with patch(
"feast.infra.online_stores.dynamodb.ThreadPoolExecutor"
) as mock_executor:
# Configure mock to work like real ThreadPoolExecutor
mock_executor.return_value.__enter__.return_value.map.return_value = iter(
[{"Responses": {}} for _ in range(15)]
)

dynamodb_online_store.online_read(
config=small_batch_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

# Verify ThreadPoolExecutor was called with max_workers=10 (capped at 10, NOT batch_size=5)
mock_executor.assert_called_once()
call_kwargs = mock_executor.call_args
assert call_kwargs[1]["max_workers"] == 10, (
f"Expected max_workers=10 (capped), got {call_kwargs[1]['max_workers']}. "
f"If got 5, the bug is using batch_size instead of 10 as cap."
)


@mock_dynamodb
def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
dynamodb_online_store,
):
"""Verify each thread creates its own boto3 resource for thread-safety.

boto3 resources are NOT thread-safe, so we must create a new resource
per thread when using ThreadPoolExecutor.
"""
config = RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=50),
offline_store=DaskOfflineStoreConfig(),
entity_key_serialization_version=3,
)

n_samples = 150 # 3 batches
db_table_name = f"{TABLE_NAME}_thread_safety"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)

# Track resources created to verify thread-safety
resources_created = []
original_init = boto3.resource

def tracking_resource(*args, **kwargs):
resource = original_init(*args, **kwargs)
resources_created.append(id(resource))
return resource

with patch.object(boto3, "resource", side_effect=tracking_resource):
returned_items = dynamodb_online_store.online_read(
config=config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

# Verify results are correct (functional correctness)
assert len(returned_items) == n_samples

# Verify multiple resources were created (thread-safety)
# Each of the 3 batches should create its own resource
# (plus potentially 1 for _get_dynamodb_resource cache initialization)
assert len(resources_created) >= 3, (
f"Expected at least 3 unique resources for 3 batches, "
f"got {len(resources_created)}"
)

# Verify the resources are actually different objects (not reused)
# At least the batch resources should be unique
unique_resources = set(resources_created)
assert len(unique_resources) >= 3, (
f"Expected at least 3 unique resource IDs, "
f"got {len(unique_resources)} unique out of {len(resources_created)}"
)