Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
perf: Parallelize DynamoDB batch reads in sync online_read
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
  • Loading branch information
abhijeet-dhumal committed Mar 4, 2026
commit 6e65f4d50e6321ab32a821c3abf6710f81a6f8d9
50 changes: 34 additions & 16 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import itertools
import logging
from collections import OrderedDict, defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

Expand Down Expand Up @@ -479,33 +480,50 @@ def online_read(
online_config.endpoint_url,
online_config.session_based_auth,
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
table_name = _get_table_name(online_config, config, table)
table_instance = dynamodb_resource.Table(table_name)
Comment thread
ntkathole marked this conversation as resolved.
Outdated

batch_size = online_config.batch_size
entity_ids = self._to_entity_ids(config, entity_keys)
entity_ids_iter = iter(entity_ids)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

# Split entity_ids into batches upfront
batches: List[List[str]] = []
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))

# No more items to insert
if len(batch) == 0:
if not batch:
break
batches.append(batch)

if not batches:
return []

# For single batch, no parallelization overhead needed
if len(batches) == 1:
batch_entity_ids = self._to_resource_batch_get_payload(
online_config, table_instance.name, batch
)
response = dynamodb_resource.batch_get_item(
RequestItems=batch_entity_ids,
online_config, table_instance.name, batches[0]
)
batch_result = self._process_batch_get_response(
table_instance.name,
response,
batch,
response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
return self._process_batch_get_response(table_name, response, batches[0])

# Execute batch requests in parallel for multiple batches
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
batch_entity_ids = self._to_resource_batch_get_payload(
online_config, table_instance.name, batch
)
return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)

# Use ThreadPoolExecutor for parallel I/O
max_workers = min(len(batches), batch_size)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
with ThreadPoolExecutor(max_workers=max_workers) as executor:
responses = list(executor.map(fetch_batch, batches))
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

# Process responses and merge results in order
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for batch, response in zip(batches, responses):
batch_result = self._process_batch_get_response(table_name, response, batch)
result.extend(batch_result)

return result

async def online_read_async(
Expand Down