Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add aiobotocore and online_read_async to dynamo read
Signed-off-by: robhowley <rhowley@seatgeek.com>
  • Loading branch information
robhowley committed May 31, 2024
commit bb0a4fba46dce41d5a7c0abcf305eac26d41dc90
128 changes: 94 additions & 34 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from aiobotocore import session
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

Expand Down Expand Up @@ -80,6 +81,7 @@ class DynamoDBOnlineStore(OnlineStore):

_dynamodb_client = None
_dynamodb_resource = None
_aioboto_session = None

def update(
self,
Expand Down Expand Up @@ -206,38 +208,9 @@ def online_write_batch(
)
self._write_batch_non_duplicates(table_instance, data, progress, config)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.

Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

def _read_batches(self, online_config, entity_ids, table_name, batch_get_item) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

batch_size = online_config.batch_size
entity_ids_iter = iter(entity_ids)
while True:
Expand All @@ -249,16 +222,16 @@ def online_read(
if len(batch) == 0:
break
batch_entity_ids = {
table_instance.name: {
table_name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": online_config.consistent_reads,
}
}
response = dynamodb_resource.batch_get_item(
response = batch_get_item(
RequestItems=batch_entity_ids,
)
response = response.get("Responses")
table_responses = response.get(table_instance.name)
table_responses = response.get(table_name)
if table_responses:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
Expand Down Expand Up @@ -286,6 +259,93 @@ def online_read(
result.extend(batch_result)
return result

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.

Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

return self._read_batches(
online_config,
entity_ids,
table_instance.name,
dynamodb_resource.batch_get_item,
)

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Reads features values for the given entity keys asynchronously.

Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
entity_keys: The list of entity keys for which feature values should be read.
requested_features: The list of features that should be read.

Returns:
A list of the same length as entity_keys. Each item in the list is a tuple where the first
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

async with self._get_aiodynamodb_client(online_config.region) as client:
return self._read_batches(
online_config,
entity_ids,
_get_table_name(online_config, config, table),
client.batch_get_item
)

def _get_aioboto_session(self):
if self._aioboto_session is None:
self._aioboto_session = session.get_session()
return self._aioboto_session

def _get_aiodynamodb_client(self, region: str):
return self._get_aioboto_session().create_client("dynamodb", region_name=region)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too sure about this, but is it a good idea to recreate client object on every call? Isn't there a performance penalty?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the docs demonstrate and recommend usage within an async context manager. it is possible manually work with _exit_stack.enter_async_context at app startup/shutdown but wanted to avoid using protected methods and adding app complexity without first knowing it was really needed. seemed like an "as needed follow up" type of investigation.


def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"hiredis>=2.0.0,<3",
]

AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0"]
AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0", "aiobotocore>=2.13.0"]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the lower bound necessary here? maybe just put a >2,<3 unless there's some specific reason


KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"]

Expand Down