diff --git a/docs/reference/online-stores/dynamodb.md b/docs/reference/online-stores/dynamodb.md index 344caccac1d..ec0104172fb 100644 --- a/docs/reference/online-stores/dynamodb.md +++ b/docs/reference/online-stores/dynamodb.md @@ -22,6 +22,47 @@ online_store: The full set of configuration options is available in [DynamoDBOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.dynamodb.DynamoDBOnlineStoreConfig). +## Configuration + +Below is a example with performance tuning options: + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: aws +online_store: + type: dynamodb + region: us-west-2 + batch_size: 100 + max_read_workers: 10 + consistent_reads: false +``` +{% endcode %} + +### Configuration Options + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `region` | string | | AWS region for DynamoDB | +| `table_name_template` | string | `{project}.{table_name}` | Template for table names | +| `batch_size` | int | `100` | Number of items per BatchGetItem/BatchWriteItem request (max 100) | +| `max_read_workers` | int | `10` | Maximum parallel threads for batch read operations. Higher values improve throughput for large batch reads but increase resource usage | +| `consistent_reads` | bool | `false` | Whether to use strongly consistent reads (higher latency, guaranteed latest data) | +| `tags` | dict | `null` | AWS resource tags added to each table | +| `session_based_auth` | bool | `false` | Use AWS session-based client authentication | + +### Performance Tuning + +**Parallel Batch Reads**: When reading features for many entities, DynamoDB's BatchGetItem is limited to 100 items per request. For 500 entities, this requires 5 batch requests. The `max_read_workers` option controls how many of these batches execute in parallel: + +- **Sequential (old behavior)**: 5 batches × 10ms = 50ms total +- **Parallel (with `max_read_workers: 10`)**: 5 batches in parallel ≈ 10ms total + +For high-throughput workloads with large entity counts, increase `max_read_workers` (up to 20-30) based on your DynamoDB capacity and network conditions. + +**Batch Size**: Increase `batch_size` up to 100 to reduce the number of API calls. However, larger batches may hit DynamoDB's 16MB response limit for tables with large feature values. + ## Permissions Feast requires the following permissions in order to execute commands for DynamoDB online store: diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 9a2d57a3278..814058c77e5 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict, defaultdict +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union @@ -76,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): session_based_auth: bool = False """AWS session based client authentication""" + max_read_workers: int = 10 + """Maximum number of parallel threads for batch read operations. + Higher values improve throughput for large batch reads but increase resource usage.""" + max_pool_connections: int = 50 """Max number of connections for async Dynamodb operations. Increase for high-throughput workloads.""" @@ -479,33 +484,71 @@ def online_read( online_config.endpoint_url, online_config.session_based_auth, ) - table_instance = dynamodb_resource.Table( - _get_table_name(online_config, config, table) - ) + table_name = _get_table_name(online_config, config, table) batch_size = online_config.batch_size entity_ids = self._to_entity_ids(config, entity_keys) - entity_ids_iter = iter(entity_ids) - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + # Split entity_ids into batches upfront + batches: List[List[str]] = [] + entity_ids_iter = iter(entity_ids) while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) - - # No more items to insert - if len(batch) == 0: + if not batch: break + batches.append(batch) + + if not batches: + return [] + + # For single batch, no parallelization overhead needed + if len(batches) == 1: batch_entity_ids = self._to_resource_batch_get_payload( - online_config, table_instance.name, batch + online_config, table_name, batches[0] ) - response = dynamodb_resource.batch_get_item( - RequestItems=batch_entity_ids, + response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) + return self._process_batch_get_response(table_name, response, batches[0]) + + # Execute batch requests in parallel for multiple batches + # Note: boto3 clients ARE thread-safe, so we can share a single client + # https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients + dynamodb_client = self._get_dynamodb_client( + online_config.region, + online_config.endpoint_url, + online_config.session_based_auth, + ) + + def fetch_batch(batch: List[str]) -> Dict[str, Any]: + batch_entity_ids = self._to_client_batch_get_payload( + online_config, table_name, batch ) + return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids) + + # Use ThreadPoolExecutor for parallel I/O + max_workers = min(len(batches), online_config.max_read_workers) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + responses = list(executor.map(fetch_batch, batches)) + + # Process responses and merge results in order + # Client responses need deserialization (unlike resource responses) + if self._type_deserializer is None: + self._type_deserializer = TypeDeserializer() + deserialize = self._type_deserializer.deserialize + + def to_tbl_resp(raw_client_response): + return { + "entity_id": deserialize(raw_client_response["entity_id"]), + "event_ts": deserialize(raw_client_response["event_ts"]), + "values": deserialize(raw_client_response["values"]), + } + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for batch, response in zip(batches, responses): batch_result = self._process_batch_get_response( - table_instance.name, - response, - batch, + table_name, response, batch, to_tbl_response=to_tbl_resp ) result.extend(batch_result) + return result async def online_read_async( diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 6dd8a99f884..7e5558e19d7 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -780,3 +780,273 @@ def test_dynamodb_update_online_store_int_list(repo_config, dynamodb_online_stor assert len(result) == 1 scores = result[0][1]["scores"] assert _extract_int32_list(scores) == [10, 20, 30] + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_empty_entities( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with empty entity list.""" + db_table_name = f"{TABLE_NAME}_empty_entities" + create_test_table(PROJECT, db_table_name, REGION) + + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=[], + ) + assert returned_items == [] + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_parallel_batches( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with multiple batches (parallel execution). + + With batch_size=100 (default), 250 entities should create 3 batches + that are executed in parallel via ThreadPoolExecutor. + """ + n_samples = 250 + db_table_name = f"{TABLE_NAME}_parallel_batches" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify all items returned + assert len(returned_items) == n_samples + # Verify order is preserved + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_single_batch_no_parallel( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with single batch (no parallelization). + + With batch_size=100, 50 entities should use single batch path + without ThreadPoolExecutor overhead. + """ + n_samples = 50 + db_table_name = f"{TABLE_NAME}_single_batch" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_order_preservation_across_batches( + repo_config, dynamodb_online_store +): + """Test that entity order is preserved across parallel batch reads. + + This is critical: parallel execution must not change the order of results. + """ + n_samples = 150 # 2 batches with batch_size=100 + db_table_name = f"{TABLE_NAME}_order_preservation" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + # Read multiple times to verify consistent ordering + for _ in range(3): + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + assert len(returned_items) == n_samples + # Verify exact order matches + for i, (returned, expected) in enumerate(zip(returned_items, features)): + assert returned[1] == expected, f"Mismatch at index {i}" + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_small_batch_size(dynamodb_online_store): + """Test parallel reads with small batch_size. + + Verifies correctness with small batch sizes that create multiple batches. + """ + small_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 25 # 5 batches with batch_size=5 + db_table_name = f"{TABLE_NAME}_small_batch" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=small_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store): + """Test parallel reads with many batches (>10). + + Verifies correctness when number of batches exceeds max_workers cap. + """ + many_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=10), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 150 # 15 batches with batch_size=10 + db_table_name = f"{TABLE_NAME}_many_batches" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=many_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_max_workers_capped_at_config(dynamodb_online_store): + """Verify ThreadPoolExecutor max_workers uses max_read_workers config. + + Bug: Old code used min(len(batches), batch_size) which fails with small batch_size. + Fix: New code uses min(len(batches), max_read_workers) for proper parallelization. + + This test uses batch_size=5 with 15 batches to expose the bug: + - OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism) + - NEW (fixed): max_workers = min(15, 10) = 10 (uses max_read_workers default) + """ + # Use small batch_size to expose the bug + small_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 75 # 15 batches with batch_size=5 + db_table_name = f"{TABLE_NAME}_max_workers_cap" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + with patch( + "feast.infra.online_stores.dynamodb.ThreadPoolExecutor" + ) as mock_executor: + # Configure mock to work like real ThreadPoolExecutor + mock_executor.return_value.__enter__.return_value.map.return_value = iter( + [{"Responses": {}} for _ in range(15)] + ) + + dynamodb_online_store.online_read( + config=small_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify ThreadPoolExecutor was called with max_workers=10 (capped at 10, NOT batch_size=5) + mock_executor.assert_called_once() + call_kwargs = mock_executor.call_args + assert call_kwargs[1]["max_workers"] == 10, ( + f"Expected max_workers=10 (capped), got {call_kwargs[1]['max_workers']}. " + f"If got 5, the bug is using batch_size instead of 10 as cap." + ) + + +@mock_dynamodb +def test_dynamodb_online_store_thread_safety_uses_shared_client( + dynamodb_online_store, +): + """Verify multi-batch reads use a shared thread-safe boto3 client. + + boto3 clients ARE thread-safe, so we share a single client across threads + for better performance (avoids creating new sessions per thread). + https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients + """ + config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=50), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 150 # 3 batches + db_table_name = f"{TABLE_NAME}_thread_safety" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + # Track clients created to verify thread-safety via shared client + clients_created = [] + original_client = boto3.client + + def tracking_client(*args, **kwargs): + client = original_client(*args, **kwargs) + clients_created.append(id(client)) + return client + + with patch.object(boto3, "client", side_effect=tracking_client): + returned_items = dynamodb_online_store.online_read( + config=config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify results are correct (functional correctness) + assert len(returned_items) == n_samples + + # Verify only one client was created (shared across threads) + # The client is cached and reused for all batch requests + dynamodb_clients = [c for c in clients_created] + assert len(set(dynamodb_clients)) == 1, ( + f"Expected 1 shared client for thread-safety, " + f"got {len(set(dynamodb_clients))} unique clients" + )