Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move retry configutation into top level, remove max_attempts
Signed-off-by: Sebastian Jäger <sebastjaeger@gmail.com>
  • Loading branch information
sebastjaeger committed Mar 21, 2025
commit 86be5fb1966fa7ac879f420a9ee59b750f4912e1
50 changes: 26 additions & 24 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,6 @@
logger = logging.getLogger(__name__)


class DynamoDBOnlineStoreRetryConfig(FeastConfigBaseModel):
"""Async online store retry configuration for DynamoDB store.

Cf. https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
for details.
"""

total_max_attempts: Union[int, None] = None
"""Maximum number of total attempts that will be made on a single request."""

max_attempts: Union[int, None] = None
"""Maximum number of retry attempts that will be made on a single request."""

mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
"""The type of retry mode (aio)botocore should use."""


class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for DynamoDB store"""

Expand Down Expand Up @@ -110,8 +93,16 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""The time in seconds until a timeout exception is thrown when attempting to read
from an async connection."""

retries: DynamoDBOnlineStoreRetryConfig = DynamoDBOnlineStoreRetryConfig()
"""Configuration for retry behavior of async connections."""
total_max_retry_attempts: Union[int, None] = None
"""Maximum number of total attempts that will be made on a single request.

Cf.
https://github.com/boto/botocore/blob/dd8406d5fa1df18037d1dd2977aec47334f7e3ce/botocore/args.py#L558
as for why `max_attempts` is not exposed here.
"""

retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
"""The type of retry mode (aio)botocore should use."""


class DynamoDBOnlineStore(OnlineStore):
Expand All @@ -135,7 +126,8 @@ async def initialize(self, config: RepoConfig):
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)

async def close(self):
Expand Down Expand Up @@ -315,7 +307,8 @@ async def online_write_batch_async(
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)
await dynamo_write_items_async(client, table_name, items)

Expand Down Expand Up @@ -425,7 +418,8 @@ def to_tbl_resp(raw_client_response):
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.retries,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)
response_batches = await asyncio.gather(
*[
Expand Down Expand Up @@ -590,19 +584,27 @@ async def _get_aiodynamodb_client(
keepalive_timeout: float,
connect_timeout: Union[int, float],
read_timeout: Union[int, float],
retries: DynamoDBOnlineStoreRetryConfig,
total_max_retry_attempts: Union[int, None],
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
):
global _aioboto_client
if _aioboto_client is None:
logger.debug("initializing the aiobotocore dynamodb client")

retries = {}
if total_max_retry_attempts is not None:
retries["total_max_attempts"] = total_max_retry_attempts
if retry_mode is not None:
retries["mode"] = retry_mode

client_context = _get_aioboto_session().create_client(
"dynamodb",
region_name=region,
config=AioConfig(
max_pool_connections=max_pool_connections,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retries=retries.model_dump(exclude_none=True),
retries=retries,
connector_args={"keepalive_timeout": keepalive_timeout},
),
)
Expand Down