Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c34bf3e
feat: dynamodb onlin read in batches
TremaMiguel Mar 6, 2022
56d2d32
run linters and format
TremaMiguel Mar 6, 2022
7b98faf
feat: batch_size parameter
TremaMiguel Mar 6, 2022
94abfb6
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 6, 2022
b5c1a3d
docs: typo in batch_size description
TremaMiguel Mar 6, 2022
5a12856
trailing white space
TremaMiguel Mar 6, 2022
8bd2a84
fix: batch_size is last argument
TremaMiguel Mar 7, 2022
fb6eacb
test: dynamodb online store online_read in batches
TremaMiguel Mar 7, 2022
1bbd5dc
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 7, 2022
307bab9
test: mock dynamodb behavior
TremaMiguel Mar 7, 2022
e52a895
feat: batch_size value must be less than 40
TremaMiguel Mar 8, 2022
29b5cf6
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 9, 2022
97fd71f
feat: batch_size defaults to 40
TremaMiguel Mar 10, 2022
ddb3f0a
Merge branch 'feat/dynamo_db_online_write_read' of github.com:TremaMi…
TremaMiguel Mar 10, 2022
12064e4
feat: sort dynamodb responses
TremaMiguel Mar 11, 2022
449f60d
merge branch master into feat/dynamo_db_online_write_read
TremaMiguel Mar 12, 2022
3f72228
resolve merge conflicts
TremaMiguel Mar 12, 2022
7a4edbd
test online response proto with redshift:dynamodb
TremaMiguel Mar 12, 2022
3843a02
feat: consistency in batch_size process
TremaMiguel Mar 15, 2022
88e183e
fix: return batch_size times None
TremaMiguel Mar 16, 2022
23cb49a
remove debug code
TremaMiguel Mar 16, 2022
44f97e7
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 22, 2022
eaf4940
typo in docstring
TremaMiguel Mar 22, 2022
5bc54d3
batch_size in onlineconfigstore
TremaMiguel Mar 23, 2022
c7ab086
Merge branch 'master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: sort dynamodb responses
Signed-off-by: Miguel Trejo <armando.trejo.marrufo@gmail.com>
  • Loading branch information
TremaMiguel committed Mar 11, 2022
commit 12064e4a4cb665b21dbdfbad9091d6417897fb8e
20 changes: 20 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
sort_response: bool = True,
Comment thread
TremaMiguel marked this conversation as resolved.
Outdated
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Expand All @@ -199,6 +200,7 @@ def online_read(
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
sort_response: wether or not to sort DynamoDB responses by the entity_ids order.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
Expand Down Expand Up @@ -238,6 +240,10 @@ def online_read(
table_responses = response.get(table_instance.name)

if table_responses:
if sort_response:
table_responses = self._sort_dynamodb_response(
Comment thread
adchia marked this conversation as resolved.
table_responses, entity_ids
)
for tbl_res in table_responses:
res = {}
for feature_name, value_bin in tbl_res["values"].items():
Expand All @@ -259,6 +265,20 @@ def _get_dynamodb_resource(self, region: str):
self._dynamodb_resource = _initialize_dynamodb_resource(region)
return self._dynamodb_resource

def _sort_dynamodb_response(self, responses: list, order: list):
"""DynamoDB Batch Get Item doesn't return items in a particular order."""
# Assign an index to order
order_with_index = {value: idx for idx, value in enumerate(order)}
# Sort table responses by index
table_responses_ordered = [
(order_with_index[tbl_res["entity_id"]], tbl_res) for tbl_res in responses
]
table_responses_ordered = sorted(
table_responses_ordered, key=lambda tup: tup[0]
)
_, table_responses_ordered = zip(*table_responses_ordered)
return table_responses_ordered


def _initialize_dynamodb_client(region: str):
return boto3.client("dynamodb", region_name=region)
Expand Down