Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: add unit tests for DynamoDB parallel batch reads
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
  • Loading branch information
abhijeet-dhumal committed Mar 4, 2026
commit c694ed9c56a88decff5309c10570bfdab3dc4155
Original file line number Diff line number Diff line change
Expand Up @@ -780,3 +780,102 @@ def test_dynamodb_update_online_store_int_list(repo_config, dynamodb_online_stor
assert len(result) == 1
scores = result[0][1]["scores"]
assert _extract_int32_list(scores) == [10, 20, 30]


@mock_dynamodb
def test_dynamodb_online_store_online_read_empty_entities(
repo_config, dynamodb_online_store
):
"""Test DynamoDBOnlineStore online_read with empty entity list."""
db_table_name = f"{TABLE_NAME}_empty_entities"
create_test_table(PROJECT, db_table_name, REGION)

returned_items = dynamodb_online_store.online_read(
config=repo_config,
table=MockFeatureView(name=db_table_name),
entity_keys=[],
)
assert returned_items == []


@mock_dynamodb
def test_dynamodb_online_store_online_read_parallel_batches(
repo_config, dynamodb_online_store
):
"""Test DynamoDBOnlineStore online_read with multiple batches (parallel execution).

With batch_size=100 (default), 250 entities should create 3 batches
that are executed in parallel via ThreadPoolExecutor.
"""
n_samples = 250
db_table_name = f"{TABLE_NAME}_parallel_batches"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)
returned_items = dynamodb_online_store.online_read(
config=repo_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

# Verify all items returned
assert len(returned_items) == n_samples
# Verify order is preserved
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb
def test_dynamodb_online_store_online_read_single_batch_no_parallel(
repo_config, dynamodb_online_store
):
"""Test DynamoDBOnlineStore online_read with single batch (no parallelization).

With batch_size=100, 50 entities should use single batch path
without ThreadPoolExecutor overhead.
"""
n_samples = 50
db_table_name = f"{TABLE_NAME}_single_batch"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)
returned_items = dynamodb_online_store.online_read(
config=repo_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)

assert len(returned_items) == n_samples
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb
def test_dynamodb_online_store_online_read_order_preservation_across_batches(
repo_config, dynamodb_online_store
):
"""Test that entity order is preserved across parallel batch reads.

This is critical: parallel execution must not change the order of results.
"""
n_samples = 150 # 2 batches with batch_size=100
db_table_name = f"{TABLE_NAME}_order_preservation"
create_test_table(PROJECT, db_table_name, REGION)
data = create_n_customer_test_samples(n=n_samples)
insert_data_test_table(data, PROJECT, db_table_name, REGION)

entity_keys, features, *rest = zip(*data)

# Read multiple times to verify consistent ordering
for _ in range(3):
returned_items = dynamodb_online_store.online_read(
config=repo_config,
table=MockFeatureView(name=db_table_name),
entity_keys=entity_keys,
)
assert len(returned_items) == n_samples
# Verify exact order matches
for i, (returned, expected) in enumerate(zip(returned_items, features)):
assert returned[1] == expected, f"Mismatch at index {i}"