Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/reference/online-stores/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,47 @@ online_store:

The full set of configuration options is available in [DynamoDBOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.dynamodb.DynamoDBOnlineStoreConfig).

## Configuration

Below is a example with performance tuning options:

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: us-west-2
batch_size: 100
max_read_workers: 10
consistent_reads: false
```
{% endcode %}

### Configuration Options

| Option | Type | Default | Description |
| ------ | ---- | ------- | ----------- |
| `region` | string | | AWS region for DynamoDB |
| `table_name_template` | string | `{project}.{table_name}` | Template for table names |
| `batch_size` | int | `100` | Number of items per BatchGetItem/BatchWriteItem request (max 100) |
| `max_read_workers` | int | `10` | Maximum parallel threads for batch read operations. Higher values improve throughput for large batch reads but increase resource usage |
| `consistent_reads` | bool | `false` | Whether to use strongly consistent reads (higher latency, guaranteed latest data) |
| `tags` | dict | `null` | AWS resource tags added to each table |
| `session_based_auth` | bool | `false` | Use AWS session-based client authentication |

### Performance Tuning

**Parallel Batch Reads**: When reading features for many entities, DynamoDB's BatchGetItem is limited to 100 items per request. For 500 entities, this requires 5 batch requests. The `max_read_workers` option controls how many of these batches execute in parallel:

- **Sequential (old behavior)**: 5 batches × 10ms = 50ms total
- **Parallel (with `max_read_workers: 10`)**: 5 batches in parallel ≈ 10ms total

For high-throughput workloads with large entity counts, increase `max_read_workers` (up to 20-30) based on your DynamoDB capacity and network conditions.

**Batch Size**: Increase `batch_size` up to 100 to reduce the number of API calls. However, larger batches may hit DynamoDB's 16MB response limit for tables with large feature values.

## Permissions

Feast requires the following permissions in order to execute commands for DynamoDB online store:
Expand Down
71 changes: 57 additions & 14 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import itertools
import logging
from collections import OrderedDict, defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

Expand Down Expand Up @@ -76,6 +77,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
session_based_auth: bool = False
"""AWS session based client authentication"""

max_read_workers: int = 10
"""Maximum number of parallel threads for batch read operations.
Higher values improve throughput for large batch reads but increase resource usage."""

max_pool_connections: int = 50
"""Max number of connections for async Dynamodb operations.
Increase for high-throughput workloads."""
Expand Down Expand Up @@ -479,33 +484,71 @@ def online_read(
online_config.endpoint_url,
online_config.session_based_auth,
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
table_name = _get_table_name(online_config, config, table)

batch_size = online_config.batch_size
entity_ids = self._to_entity_ids(config, entity_keys)
entity_ids_iter = iter(entity_ids)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

# Split entity_ids into batches upfront
batches: List[List[str]] = []
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))

# No more items to insert
if len(batch) == 0:
if not batch:
break
batches.append(batch)

if not batches:
return []

# For single batch, no parallelization overhead needed
if len(batches) == 1:
batch_entity_ids = self._to_resource_batch_get_payload(
online_config, table_instance.name, batch
online_config, table_name, batches[0]
)
response = dynamodb_resource.batch_get_item(
RequestItems=batch_entity_ids,
response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
return self._process_batch_get_response(table_name, response, batches[0])

# Execute batch requests in parallel for multiple batches
# Note: boto3 clients ARE thread-safe, so we can share a single client
# https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
dynamodb_client = self._get_dynamodb_client(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)

def fetch_batch(batch: List[str]) -> Dict[str, Any]:
batch_entity_ids = self._to_client_batch_get_payload(
online_config, table_name, batch
)
return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids)

# Use ThreadPoolExecutor for parallel I/O
max_workers = min(len(batches), online_config.max_read_workers)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
responses = list(executor.map(fetch_batch, batches))

# Process responses and merge results in order
# Client responses need deserialization (unlike resource responses)
if self._type_deserializer is None:
self._type_deserializer = TypeDeserializer()
deserialize = self._type_deserializer.deserialize

def to_tbl_resp(raw_client_response):
return {
"entity_id": deserialize(raw_client_response["entity_id"]),
"event_ts": deserialize(raw_client_response["event_ts"]),
"values": deserialize(raw_client_response["values"]),
}

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for batch, response in zip(batches, responses):
batch_result = self._process_batch_get_response(
table_instance.name,
response,
batch,
table_name, response, batch, to_tbl_response=to_tbl_resp
)
result.extend(batch_result)

return result

async def online_read_async(
Expand Down
Loading
Loading