From 6e65f4d50e6321ab32a821c3abf6710f81a6f8d9 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 25 Feb 2026 20:16:48 +0530 Subject: [PATCH 1/6] perf: Parallelize DynamoDB batch reads in sync online_read Signed-off-by: abhijeet-dhumal --- .../feast/infra/online_stores/dynamodb.py | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 9a2d57a3278..2d49a0846cd 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict, defaultdict +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union @@ -479,33 +480,50 @@ def online_read( online_config.endpoint_url, online_config.session_based_auth, ) - table_instance = dynamodb_resource.Table( - _get_table_name(online_config, config, table) - ) + table_name = _get_table_name(online_config, config, table) + table_instance = dynamodb_resource.Table(table_name) batch_size = online_config.batch_size entity_ids = self._to_entity_ids(config, entity_keys) - entity_ids_iter = iter(entity_ids) - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + # Split entity_ids into batches upfront + batches: List[List[str]] = [] + entity_ids_iter = iter(entity_ids) while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) - - # No more items to insert - if len(batch) == 0: + if not batch: break + batches.append(batch) + + if not batches: + return [] + + # For single batch, no parallelization overhead needed + if len(batches) == 1: batch_entity_ids = self._to_resource_batch_get_payload( - online_config, table_instance.name, batch - ) - response = dynamodb_resource.batch_get_item( - RequestItems=batch_entity_ids, + online_config, table_instance.name, batches[0] ) - batch_result = self._process_batch_get_response( - table_instance.name, - response, - batch, + response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) + return self._process_batch_get_response(table_name, response, batches[0]) + + # Execute batch requests in parallel for multiple batches + def fetch_batch(batch: List[str]) -> Dict[str, Any]: + batch_entity_ids = self._to_resource_batch_get_payload( + online_config, table_instance.name, batch ) + return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) + + # Use ThreadPoolExecutor for parallel I/O + max_workers = min(len(batches), batch_size) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + responses = list(executor.map(fetch_batch, batches)) + + # Process responses and merge results in order + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for batch, response in zip(batches, responses): + batch_result = self._process_batch_get_response(table_name, response, batch) result.extend(batch_result) + return result async def online_read_async( From c694ed9c56a88decff5309c10570bfdab3dc4155 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 25 Feb 2026 20:32:25 +0530 Subject: [PATCH 2/6] test: add unit tests for DynamoDB parallel batch reads Signed-off-by: abhijeet-dhumal --- .../test_dynamodb_online_store.py | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 6dd8a99f884..e76fb5e70e5 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -780,3 +780,102 @@ def test_dynamodb_update_online_store_int_list(repo_config, dynamodb_online_stor assert len(result) == 1 scores = result[0][1]["scores"] assert _extract_int32_list(scores) == [10, 20, 30] + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_empty_entities( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with empty entity list.""" + db_table_name = f"{TABLE_NAME}_empty_entities" + create_test_table(PROJECT, db_table_name, REGION) + + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=[], + ) + assert returned_items == [] + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_parallel_batches( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with multiple batches (parallel execution). + + With batch_size=100 (default), 250 entities should create 3 batches + that are executed in parallel via ThreadPoolExecutor. + """ + n_samples = 250 + db_table_name = f"{TABLE_NAME}_parallel_batches" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify all items returned + assert len(returned_items) == n_samples + # Verify order is preserved + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_single_batch_no_parallel( + repo_config, dynamodb_online_store +): + """Test DynamoDBOnlineStore online_read with single batch (no parallelization). + + With batch_size=100, 50 entities should use single batch path + without ThreadPoolExecutor overhead. + """ + n_samples = 50 + db_table_name = f"{TABLE_NAME}_single_batch" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_order_preservation_across_batches( + repo_config, dynamodb_online_store +): + """Test that entity order is preserved across parallel batch reads. + + This is critical: parallel execution must not change the order of results. + """ + n_samples = 150 # 2 batches with batch_size=100 + db_table_name = f"{TABLE_NAME}_order_preservation" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + # Read multiple times to verify consistent ordering + for _ in range(3): + returned_items = dynamodb_online_store.online_read( + config=repo_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + assert len(returned_items) == n_samples + # Verify exact order matches + for i, (returned, expected) in enumerate(zip(returned_items, features)): + assert returned[1] == expected, f"Mismatch at index {i}" From e4a52027275371666a7ef43173404ac947dd74c0 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 25 Feb 2026 20:57:10 +0530 Subject: [PATCH 3/6] fix: address thread-safety and max_workers issues in parallel DynamoDB reads Signed-off-by: abhijeet-dhumal --- .../feast/infra/online_stores/dynamodb.py | 13 +- .../test_dynamodb_online_store.py | 178 ++++++++++++++++++ 2 files changed, 188 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 2d49a0846cd..4db312f9304 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -507,14 +507,21 @@ def online_read( return self._process_batch_get_response(table_name, response, batches[0]) # Execute batch requests in parallel for multiple batches + # Note: boto3 resources are NOT thread-safe, so we create a new resource per thread def fetch_batch(batch: List[str]) -> Dict[str, Any]: + thread_resource = _initialize_dynamodb_resource( + online_config.region, + online_config.endpoint_url, + online_config.session_based_auth, + ) batch_entity_ids = self._to_resource_batch_get_payload( - online_config, table_instance.name, batch + online_config, table_name, batch ) - return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) + return thread_resource.batch_get_item(RequestItems=batch_entity_ids) # Use ThreadPoolExecutor for parallel I/O - max_workers = min(len(batches), batch_size) + # Cap at 10 workers to avoid excessive thread creation + max_workers = min(len(batches), 10) with ThreadPoolExecutor(max_workers=max_workers) as executor: responses = list(executor.map(fetch_batch, batches)) diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index e76fb5e70e5..79da514d767 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -879,3 +879,181 @@ def test_dynamodb_online_store_online_read_order_preservation_across_batches( # Verify exact order matches for i, (returned, expected) in enumerate(zip(returned_items, features)): assert returned[1] == expected, f"Mismatch at index {i}" + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_small_batch_size(dynamodb_online_store): + """Test parallel reads with small batch_size. + + Verifies correctness with small batch sizes that create multiple batches. + """ + small_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 25 # 5 batches with batch_size=5 + db_table_name = f"{TABLE_NAME}_small_batch" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=small_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store): + """Test parallel reads with many batches (>10). + + Verifies correctness when number of batches exceeds max_workers cap. + """ + many_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=10), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 150 # 15 batches with batch_size=10 + db_table_name = f"{TABLE_NAME}_many_batches" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + returned_items = dynamodb_online_store.online_read( + config=many_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + assert len(returned_items) == n_samples + assert [item[1] for item in returned_items] == list(features) + + +@mock_dynamodb +def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store): + """Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size. + + Bug: Old code used min(len(batches), batch_size) which fails with small batch_size. + Fix: New code uses min(len(batches), 10) to ensure proper parallelization. + + This test uses batch_size=5 with 15 batches to expose the bug: + - OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism) + - NEW (fixed): max_workers = min(15, 10) = 10 (correct cap) + """ + # Use small batch_size to expose the bug + small_batch_config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 75 # 15 batches with batch_size=5 + db_table_name = f"{TABLE_NAME}_max_workers_cap" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + with patch( + "feast.infra.online_stores.dynamodb.ThreadPoolExecutor" + ) as mock_executor: + # Configure mock to work like real ThreadPoolExecutor + mock_executor.return_value.__enter__.return_value.map.return_value = iter( + [{"Responses": {}} for _ in range(15)] + ) + + dynamodb_online_store.online_read( + config=small_batch_config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify ThreadPoolExecutor was called with max_workers=10 (capped at 10, NOT batch_size=5) + mock_executor.assert_called_once() + call_kwargs = mock_executor.call_args + assert call_kwargs[1]["max_workers"] == 10, ( + f"Expected max_workers=10 (capped), got {call_kwargs[1]['max_workers']}. " + f"If got 5, the bug is using batch_size instead of 10 as cap." + ) + + +@mock_dynamodb +def test_dynamodb_online_store_thread_safety_new_resource_per_thread( + dynamodb_online_store, +): + """Verify each thread creates its own boto3 resource for thread-safety. + + boto3 resources are NOT thread-safe, so we must create a new resource + per thread when using ThreadPoolExecutor. + """ + config = RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=50), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + n_samples = 150 # 3 batches + db_table_name = f"{TABLE_NAME}_thread_safety" + create_test_table(PROJECT, db_table_name, REGION) + data = create_n_customer_test_samples(n=n_samples) + insert_data_test_table(data, PROJECT, db_table_name, REGION) + + entity_keys, features, *rest = zip(*data) + + # Track resources created to verify thread-safety + resources_created = [] + original_init = boto3.resource + + def tracking_resource(*args, **kwargs): + resource = original_init(*args, **kwargs) + resources_created.append(id(resource)) + return resource + + with patch.object(boto3, "resource", side_effect=tracking_resource): + returned_items = dynamodb_online_store.online_read( + config=config, + table=MockFeatureView(name=db_table_name), + entity_keys=entity_keys, + ) + + # Verify results are correct (functional correctness) + assert len(returned_items) == n_samples + + # Verify multiple resources were created (thread-safety) + # Each of the 3 batches should create its own resource + # (plus potentially 1 for _get_dynamodb_resource cache initialization) + assert len(resources_created) >= 3, ( + f"Expected at least 3 unique resources for 3 batches, " + f"got {len(resources_created)}" + ) + + # Verify the resources are actually different objects (not reused) + # At least the batch resources should be unique + unique_resources = set(resources_created) + assert len(unique_resources) >= 3, ( + f"Expected at least 3 unique resource IDs, " + f"got {len(unique_resources)} unique out of {len(resources_created)}" + ) From 342587d03c79081516eeeeb4e577fe7b176feec8 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 2 Mar 2026 11:50:37 +0530 Subject: [PATCH 4/6] fix: Improve DynamoDB parallel reads: shared client, configurable workers Signed-off-by: abhijeet-dhumal --- .../feast/infra/online_stores/dynamodb.py | 44 ++++++++++----- .../test_dynamodb_online_store.py | 53 ++++++++----------- 2 files changed, 54 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 4db312f9304..814058c77e5 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -77,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): session_based_auth: bool = False """AWS session based client authentication""" + max_read_workers: int = 10 + """Maximum number of parallel threads for batch read operations. + Higher values improve throughput for large batch reads but increase resource usage.""" + max_pool_connections: int = 50 """Max number of connections for async Dynamodb operations. Increase for high-throughput workloads.""" @@ -481,7 +485,6 @@ def online_read( online_config.session_based_auth, ) table_name = _get_table_name(online_config, config, table) - table_instance = dynamodb_resource.Table(table_name) batch_size = online_config.batch_size entity_ids = self._to_entity_ids(config, entity_keys) @@ -501,34 +504,49 @@ def online_read( # For single batch, no parallelization overhead needed if len(batches) == 1: batch_entity_ids = self._to_resource_batch_get_payload( - online_config, table_instance.name, batches[0] + online_config, table_name, batches[0] ) response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) return self._process_batch_get_response(table_name, response, batches[0]) # Execute batch requests in parallel for multiple batches - # Note: boto3 resources are NOT thread-safe, so we create a new resource per thread + # Note: boto3 clients ARE thread-safe, so we can share a single client + # https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients + dynamodb_client = self._get_dynamodb_client( + online_config.region, + online_config.endpoint_url, + online_config.session_based_auth, + ) + def fetch_batch(batch: List[str]) -> Dict[str, Any]: - thread_resource = _initialize_dynamodb_resource( - online_config.region, - online_config.endpoint_url, - online_config.session_based_auth, - ) - batch_entity_ids = self._to_resource_batch_get_payload( + batch_entity_ids = self._to_client_batch_get_payload( online_config, table_name, batch ) - return thread_resource.batch_get_item(RequestItems=batch_entity_ids) + return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids) # Use ThreadPoolExecutor for parallel I/O - # Cap at 10 workers to avoid excessive thread creation - max_workers = min(len(batches), 10) + max_workers = min(len(batches), online_config.max_read_workers) with ThreadPoolExecutor(max_workers=max_workers) as executor: responses = list(executor.map(fetch_batch, batches)) # Process responses and merge results in order + # Client responses need deserialization (unlike resource responses) + if self._type_deserializer is None: + self._type_deserializer = TypeDeserializer() + deserialize = self._type_deserializer.deserialize + + def to_tbl_resp(raw_client_response): + return { + "entity_id": deserialize(raw_client_response["entity_id"]), + "event_ts": deserialize(raw_client_response["event_ts"]), + "values": deserialize(raw_client_response["values"]), + } + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for batch, response in zip(batches, responses): - batch_result = self._process_batch_get_response(table_name, response, batch) + batch_result = self._process_batch_get_response( + table_name, response, batch, to_tbl_response=to_tbl_resp + ) result.extend(batch_result) return result diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 79da514d767..7e5558e19d7 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -946,15 +946,15 @@ def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store): @mock_dynamodb -def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store): - """Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size. +def test_dynamodb_online_store_max_workers_capped_at_config(dynamodb_online_store): + """Verify ThreadPoolExecutor max_workers uses max_read_workers config. Bug: Old code used min(len(batches), batch_size) which fails with small batch_size. - Fix: New code uses min(len(batches), 10) to ensure proper parallelization. + Fix: New code uses min(len(batches), max_read_workers) for proper parallelization. This test uses batch_size=5 with 15 batches to expose the bug: - OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism) - - NEW (fixed): max_workers = min(15, 10) = 10 (correct cap) + - NEW (fixed): max_workers = min(15, 10) = 10 (uses max_read_workers default) """ # Use small batch_size to expose the bug small_batch_config = RepoConfig( @@ -998,13 +998,14 @@ def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store): @mock_dynamodb -def test_dynamodb_online_store_thread_safety_new_resource_per_thread( +def test_dynamodb_online_store_thread_safety_uses_shared_client( dynamodb_online_store, ): - """Verify each thread creates its own boto3 resource for thread-safety. + """Verify multi-batch reads use a shared thread-safe boto3 client. - boto3 resources are NOT thread-safe, so we must create a new resource - per thread when using ThreadPoolExecutor. + boto3 clients ARE thread-safe, so we share a single client across threads + for better performance (avoids creating new sessions per thread). + https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients """ config = RepoConfig( registry=REGISTRY, @@ -1023,16 +1024,16 @@ def test_dynamodb_online_store_thread_safety_new_resource_per_thread( entity_keys, features, *rest = zip(*data) - # Track resources created to verify thread-safety - resources_created = [] - original_init = boto3.resource + # Track clients created to verify thread-safety via shared client + clients_created = [] + original_client = boto3.client - def tracking_resource(*args, **kwargs): - resource = original_init(*args, **kwargs) - resources_created.append(id(resource)) - return resource + def tracking_client(*args, **kwargs): + client = original_client(*args, **kwargs) + clients_created.append(id(client)) + return client - with patch.object(boto3, "resource", side_effect=tracking_resource): + with patch.object(boto3, "client", side_effect=tracking_client): returned_items = dynamodb_online_store.online_read( config=config, table=MockFeatureView(name=db_table_name), @@ -1042,18 +1043,10 @@ def tracking_resource(*args, **kwargs): # Verify results are correct (functional correctness) assert len(returned_items) == n_samples - # Verify multiple resources were created (thread-safety) - # Each of the 3 batches should create its own resource - # (plus potentially 1 for _get_dynamodb_resource cache initialization) - assert len(resources_created) >= 3, ( - f"Expected at least 3 unique resources for 3 batches, " - f"got {len(resources_created)}" - ) - - # Verify the resources are actually different objects (not reused) - # At least the batch resources should be unique - unique_resources = set(resources_created) - assert len(unique_resources) >= 3, ( - f"Expected at least 3 unique resource IDs, " - f"got {len(unique_resources)} unique out of {len(resources_created)}" + # Verify only one client was created (shared across threads) + # The client is cached and reused for all batch requests + dynamodb_clients = [c for c in clients_created] + assert len(set(dynamodb_clients)) == 1, ( + f"Expected 1 shared client for thread-safety, " + f"got {len(set(dynamodb_clients))} unique clients" ) From 2dce44b4faf42bcd5a1b72f57edda30a1a1e3b82 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 2 Mar 2026 19:14:12 +0530 Subject: [PATCH 5/6] docs: add max_read_workers config documentation for DynamoDB online store Signed-off-by: abhijeet-dhumal --- docs/reference/online-stores/dynamodb.md | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/reference/online-stores/dynamodb.md b/docs/reference/online-stores/dynamodb.md index 344caccac1d..747c3cc2087 100644 --- a/docs/reference/online-stores/dynamodb.md +++ b/docs/reference/online-stores/dynamodb.md @@ -22,6 +22,47 @@ online_store: The full set of configuration options is available in [DynamoDBOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.dynamodb.DynamoDBOnlineStoreConfig). +## Configuration + +Below is a example with performance tuning options: + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: aws +online_store: + type: dynamodb + region: us-west-2 + batch_size: 100 + max_read_workers: 10 + consistent_reads: false +``` +{% endcode %} + +### Configuration Options + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `region` | string | | AWS region for DynamoDB | +| `table_name_template` | string | `{project}.{table_name}` | Template for table names | +| `batch_size` | int | `40` | Number of items per BatchGetItem/BatchWriteItem request (max 100) | +| `max_read_workers` | int | `10` | Maximum parallel threads for batch read operations. Higher values improve throughput for large batch reads but increase resource usage | +| `consistent_reads` | bool | `false` | Whether to use strongly consistent reads (higher latency, guaranteed latest data) | +| `tags` | dict | `null` | AWS resource tags added to each table | +| `session_based_auth` | bool | `false` | Use AWS session-based client authentication | + +### Performance Tuning + +**Parallel Batch Reads**: When reading features for many entities, DynamoDB's BatchGetItem is limited to 100 items per request. For 500 entities, this requires 5 batch requests. The `max_read_workers` option controls how many of these batches execute in parallel: + +- **Sequential (old behavior)**: 5 batches × 10ms = 50ms total +- **Parallel (with `max_read_workers: 10`)**: 5 batches in parallel ≈ 10ms total + +For high-throughput workloads with large entity counts, increase `max_read_workers` (up to 20-30) based on your DynamoDB capacity and network conditions. + +**Batch Size**: Increase `batch_size` up to 100 to reduce the number of API calls. However, larger batches may hit DynamoDB's 16MB response limit for tables with large feature values. + ## Permissions Feast requires the following permissions in order to execute commands for DynamoDB online store: From 8ead3149d1021785d2785d7d1f27b8853494b9c6 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 4 Mar 2026 11:44:53 +0530 Subject: [PATCH 6/6] docs: Fix default max worker count in docs Signed-off-by: abhijeet-dhumal --- docs/reference/online-stores/dynamodb.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/online-stores/dynamodb.md b/docs/reference/online-stores/dynamodb.md index 747c3cc2087..ec0104172fb 100644 --- a/docs/reference/online-stores/dynamodb.md +++ b/docs/reference/online-stores/dynamodb.md @@ -46,7 +46,7 @@ online_store: | ------ | ---- | ------- | ----------- | | `region` | string | | AWS region for DynamoDB | | `table_name_template` | string | `{project}.{table_name}` | Template for table names | -| `batch_size` | int | `40` | Number of items per BatchGetItem/BatchWriteItem request (max 100) | +| `batch_size` | int | `100` | Number of items per BatchGetItem/BatchWriteItem request (max 100) | | `max_read_workers` | int | `10` | Maximum parallel threads for batch read operations. Higher values improve throughput for large batch reads but increase resource usage | | `consistent_reads` | bool | `false` | Whether to use strongly consistent reads (higher latency, guaranteed latest data) | | `tags` | dict | `null` | AWS resource tags added to each table |