Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add async DynamoDB timeout and retry configuration
Signed-off-by: Sebastian Jäger <sebastjaeger@gmail.com>
  • Loading branch information
sebastjaeger committed Mar 21, 2025
commit 61bea086fbbafe189c37929cef43818af76dec6f
55 changes: 51 additions & 4 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@
logger = logging.getLogger(__name__)


class DynamoDBOnlineStoreRetryConfig(FeastConfigBaseModel):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i would move this into DynamoDBOnlineStoreConfig.

"""Async online store retry configuration for DynamoDB store.

Cf. https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
for details.
"""

total_max_attempts: int | None = None
"""Maximum number of total attempts that will be made on a single request."""

max_attempts: int | None = None
"""Maximum number of retry attempts that will be made on a single request."""

mode: Literal["legacy", "standard", "adaptive"] | None = None
"""The type of retry mode (aio)botocore should use."""


class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for DynamoDB store"""

Expand Down Expand Up @@ -85,6 +102,17 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
keepalive_timeout: float = 12.0
"""Keep-alive timeout in seconds for async Dynamodb connections."""

connect_timeout: int | float = 60
"""The time in seconds until a timeout exception is thrown when attempting to make
an async connection."""

read_timeout: int | float = 60
"""The time in seconds until a timeout exception is thrown when attempting to read
from an async connection."""

retries: DynamoDBOnlineStoreRetryConfig = DynamoDBOnlineStoreRetryConfig()
"""Configuration for retry behavior of async connections."""


class DynamoDBOnlineStore(OnlineStore):
"""
Expand All @@ -99,10 +127,15 @@ class DynamoDBOnlineStore(OnlineStore):
_dynamodb_resource = None

async def initialize(self, config: RepoConfig):
online_config = config.online_store

await _get_aiodynamodb_client(
config.online_store.region,
config.online_store.max_pool_connections,
config.online_store.keepalive_timeout,
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
)

async def close(self):
Expand Down Expand Up @@ -280,6 +313,9 @@ async def online_write_batch_async(
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
)
await dynamo_write_items_async(client, table_name, items)

Expand Down Expand Up @@ -387,6 +423,9 @@ def to_tbl_resp(raw_client_response):
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
)
response_batches = await asyncio.gather(
*[
Expand Down Expand Up @@ -546,7 +585,12 @@ def _get_aioboto_session():


async def _get_aiodynamodb_client(
region: str, max_pool_connections: int, keepalive_timeout: float
region: str,
max_pool_connections: int,
keepalive_timeout: float,
connect_timeout: int | float,
read_timeout: int | float,
retries: DynamoDBOnlineStoreRetryConfig,
):
global _aioboto_client
if _aioboto_client is None:
Expand All @@ -556,6 +600,9 @@ async def _get_aiodynamodb_client(
region_name=region,
config=AioConfig(
max_pool_connections=max_pool_connections,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retries=retries.model_dump(exclude_none=True),
connector_args={"keepalive_timeout": keepalive_timeout},
),
)
Expand Down