Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c34bf3e
feat: dynamodb onlin read in batches
TremaMiguel Mar 6, 2022
56d2d32
run linters and format
TremaMiguel Mar 6, 2022
7b98faf
feat: batch_size parameter
TremaMiguel Mar 6, 2022
94abfb6
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 6, 2022
b5c1a3d
docs: typo in batch_size description
TremaMiguel Mar 6, 2022
5a12856
trailing white space
TremaMiguel Mar 6, 2022
8bd2a84
fix: batch_size is last argument
TremaMiguel Mar 7, 2022
fb6eacb
test: dynamodb online store online_read in batches
TremaMiguel Mar 7, 2022
1bbd5dc
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 7, 2022
307bab9
test: mock dynamodb behavior
TremaMiguel Mar 7, 2022
e52a895
feat: batch_size value must be less than 40
TremaMiguel Mar 8, 2022
29b5cf6
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 9, 2022
97fd71f
feat: batch_size defaults to 40
TremaMiguel Mar 10, 2022
ddb3f0a
Merge branch 'feat/dynamo_db_online_write_read' of github.com:TremaMi…
TremaMiguel Mar 10, 2022
12064e4
feat: sort dynamodb responses
TremaMiguel Mar 11, 2022
449f60d
merge branch master into feat/dynamo_db_online_write_read
TremaMiguel Mar 12, 2022
3f72228
resolve merge conflicts
TremaMiguel Mar 12, 2022
7a4edbd
test online response proto with redshift:dynamodb
TremaMiguel Mar 12, 2022
3843a02
feat: consistency in batch_size process
TremaMiguel Mar 15, 2022
88e183e
fix: return batch_size times None
TremaMiguel Mar 16, 2022
23cb49a
remove debug code
TremaMiguel Mar 16, 2022
44f97e7
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 22, 2022
eaf4940
typo in docstring
TremaMiguel Mar 22, 2022
5bc54d3
batch_size in onlineconfigstore
TremaMiguel Mar 23, 2022
c7ab086
Merge branch 'master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: consistency in batch_size process
Signed-off-by: Miguel Trejo <armando.trejo.marrufo@gmail.com>
  • Loading branch information
TremaMiguel committed Mar 15, 2022
commit 3843a02f02275a5fc9b31e15e25b25c129d3e5ce
42 changes: 18 additions & 24 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
Expand Down Expand Up @@ -50,10 +51,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""Online store type selector"""

region: StrictStr
""" AWS Region Name """
"""AWS Region Name"""

table_name_template: StrictStr = "{project}.{table_name}"
""" DynamoDB table name template """
"""DynamoDB table name template"""

sort_response: bool = True
"""Wether or not to sort BatchGetItem response."""
Comment thread
adchia marked this conversation as resolved.
Outdated


class DynamoDBOnlineStore(OnlineStore):
Expand All @@ -63,10 +67,12 @@ class DynamoDBOnlineStore(OnlineStore):
Attributes:
_dynamodb_client: Boto3 DynamoDB client.
_dynamodb_resource: Boto3 DynamoDB resource.
_batch_size: Number of items to retrieve in a DynamoDB BatchGetItem call.
"""

_dynamodb_client = None
_dynamodb_resource = None
_batch_size = 40

@log_exceptions_and_usage(online_store="dynamodb")
def update(
Expand Down Expand Up @@ -196,7 +202,6 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
sort_response: bool = True,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Expand All @@ -208,7 +213,6 @@ def online_read(
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
sort_response: wether or not to sort DynamoDB responses by the entity_ids order.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
Expand All @@ -219,36 +223,26 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]

len_entity_ids = len(entity_ids)
batch_size = 40
# Iterate until the end_index is the value len_entity_ids
iters = (
len_entity_ids // batch_size + 1
if len_entity_ids % batch_size > 0
else len_entity_ids // batch_size
)
for i in range(iters):
start_index = min(i * batch_size, len_entity_ids)
end_index = min(i * batch_size + batch_size, len_entity_ids)


batch_size = self._batch_size
sort_response = online_config.sort_response
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))
# No more items to insert
if len(batch) == 0:
break
batch_entity_ids = {
table_instance.name: {
"Keys": [
{"entity_id": entity_id}
for entity_id in entity_ids[start_index:end_index]
]
"Keys": [{"entity_id": entity_id} for entity_id in batch]
}
}

with tracing_span(name="remote_call"):
response = dynamodb_resource.batch_get_item(
RequestItems=batch_entity_ids
)

response = response.get("Responses")
table_responses = response.get(table_instance.name)

if table_responses:
if sort_response:
table_responses = self._sort_dynamodb_response(
Comment thread
adchia marked this conversation as resolved.
Expand Down