Skip to content

Commit b401f42

Browse files
abhijeet-dhumalntkathole
authored andcommitted
perf: Parallelize DynamoDB batch reads in sync online_read (feast-dev#6024)
* perf: Parallelize DynamoDB batch reads in sync online_read Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> * test: add unit tests for DynamoDB parallel batch reads Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> * fix: address thread-safety and max_workers issues in parallel DynamoDB reads Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> * fix: Improve DynamoDB parallel reads: shared client, configurable workers Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> * docs: add max_read_workers config documentation for DynamoDB online store Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> * docs: Fix default max worker count in docs Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> --------- Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent b63b240 commit b401f42

3 files changed

Lines changed: 368 additions & 14 deletions

File tree

docs/reference/online-stores/dynamodb.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,47 @@ online_store:
2222
2323
The full set of configuration options is available in [DynamoDBOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.dynamodb.DynamoDBOnlineStoreConfig).
2424
25+
## Configuration
26+
27+
Below is a example with performance tuning options:
28+
29+
{% code title="feature_store.yaml" %}
30+
```yaml
31+
project: my_feature_repo
32+
registry: data/registry.db
33+
provider: aws
34+
online_store:
35+
type: dynamodb
36+
region: us-west-2
37+
batch_size: 100
38+
max_read_workers: 10
39+
consistent_reads: false
40+
```
41+
{% endcode %}
42+
43+
### Configuration Options
44+
45+
| Option | Type | Default | Description |
46+
| ------ | ---- | ------- | ----------- |
47+
| `region` | string | | AWS region for DynamoDB |
48+
| `table_name_template` | string | `{project}.{table_name}` | Template for table names |
49+
| `batch_size` | int | `100` | Number of items per BatchGetItem/BatchWriteItem request (max 100) |
50+
| `max_read_workers` | int | `10` | Maximum parallel threads for batch read operations. Higher values improve throughput for large batch reads but increase resource usage |
51+
| `consistent_reads` | bool | `false` | Whether to use strongly consistent reads (higher latency, guaranteed latest data) |
52+
| `tags` | dict | `null` | AWS resource tags added to each table |
53+
| `session_based_auth` | bool | `false` | Use AWS session-based client authentication |
54+
55+
### Performance Tuning
56+
57+
**Parallel Batch Reads**: When reading features for many entities, DynamoDB's BatchGetItem is limited to 100 items per request. For 500 entities, this requires 5 batch requests. The `max_read_workers` option controls how many of these batches execute in parallel:
58+
59+
- **Sequential (old behavior)**: 5 batches × 10ms = 50ms total
60+
- **Parallel (with `max_read_workers: 10`)**: 5 batches in parallel ≈ 10ms total
61+
62+
For high-throughput workloads with large entity counts, increase `max_read_workers` (up to 20-30) based on your DynamoDB capacity and network conditions.
63+
64+
**Batch Size**: Increase `batch_size` up to 100 to reduce the number of API calls. However, larger batches may hit DynamoDB's 16MB response limit for tables with large feature values.
65+
2566
## Permissions
2667

2768
Feast requires the following permissions in order to execute commands for DynamoDB online store:

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import itertools
1717
import logging
1818
from collections import OrderedDict, defaultdict
19+
from concurrent.futures import ThreadPoolExecutor
1920
from datetime import datetime
2021
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
2122

@@ -76,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7677
session_based_auth: bool = False
7778
"""AWS session based client authentication"""
7879

80+
max_read_workers: int = 10
81+
"""Maximum number of parallel threads for batch read operations.
82+
Higher values improve throughput for large batch reads but increase resource usage."""
83+
7984
max_pool_connections: int = 50
8085
"""Max number of connections for async Dynamodb operations.
8186
Increase for high-throughput workloads."""
@@ -479,33 +484,71 @@ def online_read(
479484
online_config.endpoint_url,
480485
online_config.session_based_auth,
481486
)
482-
table_instance = dynamodb_resource.Table(
483-
_get_table_name(online_config, config, table)
484-
)
487+
table_name = _get_table_name(online_config, config, table)
485488

486489
batch_size = online_config.batch_size
487490
entity_ids = self._to_entity_ids(config, entity_keys)
488-
entity_ids_iter = iter(entity_ids)
489-
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
490491

492+
# Split entity_ids into batches upfront
493+
batches: List[List[str]] = []
494+
entity_ids_iter = iter(entity_ids)
491495
while True:
492496
batch = list(itertools.islice(entity_ids_iter, batch_size))
493-
494-
# No more items to insert
495-
if len(batch) == 0:
497+
if not batch:
496498
break
499+
batches.append(batch)
500+
501+
if not batches:
502+
return []
503+
504+
# For single batch, no parallelization overhead needed
505+
if len(batches) == 1:
497506
batch_entity_ids = self._to_resource_batch_get_payload(
498-
online_config, table_instance.name, batch
507+
online_config, table_name, batches[0]
499508
)
500-
response = dynamodb_resource.batch_get_item(
501-
RequestItems=batch_entity_ids,
509+
response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
510+
return self._process_batch_get_response(table_name, response, batches[0])
511+
512+
# Execute batch requests in parallel for multiple batches
513+
# Note: boto3 clients ARE thread-safe, so we can share a single client
514+
# https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
515+
dynamodb_client = self._get_dynamodb_client(
516+
online_config.region,
517+
online_config.endpoint_url,
518+
online_config.session_based_auth,
519+
)
520+
521+
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
522+
batch_entity_ids = self._to_client_batch_get_payload(
523+
online_config, table_name, batch
502524
)
525+
return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids)
526+
527+
# Use ThreadPoolExecutor for parallel I/O
528+
max_workers = min(len(batches), online_config.max_read_workers)
529+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
530+
responses = list(executor.map(fetch_batch, batches))
531+
532+
# Process responses and merge results in order
533+
# Client responses need deserialization (unlike resource responses)
534+
if self._type_deserializer is None:
535+
self._type_deserializer = TypeDeserializer()
536+
deserialize = self._type_deserializer.deserialize
537+
538+
def to_tbl_resp(raw_client_response):
539+
return {
540+
"entity_id": deserialize(raw_client_response["entity_id"]),
541+
"event_ts": deserialize(raw_client_response["event_ts"]),
542+
"values": deserialize(raw_client_response["values"]),
543+
}
544+
545+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
546+
for batch, response in zip(batches, responses):
503547
batch_result = self._process_batch_get_response(
504-
table_instance.name,
505-
response,
506-
batch,
548+
table_name, response, batch, to_tbl_response=to_tbl_resp
507549
)
508550
result.extend(batch_result)
551+
509552
return result
510553

511554
async def online_read_async(

0 commit comments

Comments
 (0)