Skip to content

Commit 3843a02

Browse files
committed
feat: consistency in batch_size process
Signed-off-by: Miguel Trejo <armando.trejo.marrufo@gmail.com>
1 parent 7a4edbd commit 3843a02

1 file changed

Lines changed: 18 additions & 24 deletions

File tree

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import itertools
1415
import logging
1516
from datetime import datetime
1617
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
@@ -50,10 +51,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5051
"""Online store type selector"""
5152

5253
region: StrictStr
53-
""" AWS Region Name """
54+
"""AWS Region Name"""
5455

5556
table_name_template: StrictStr = "{project}.{table_name}"
56-
""" DynamoDB table name template """
57+
"""DynamoDB table name template"""
58+
59+
sort_response: bool = True
60+
"""Wether or not to sort BatchGetItem response."""
5761

5862

5963
class DynamoDBOnlineStore(OnlineStore):
@@ -63,10 +67,12 @@ class DynamoDBOnlineStore(OnlineStore):
6367
Attributes:
6468
_dynamodb_client: Boto3 DynamoDB client.
6569
_dynamodb_resource: Boto3 DynamoDB resource.
70+
_batch_size: Number of items to retrieve in a DynamoDB BatchGetItem call.
6671
"""
6772

6873
_dynamodb_client = None
6974
_dynamodb_resource = None
75+
_batch_size = 40
7076

7177
@log_exceptions_and_usage(online_store="dynamodb")
7278
def update(
@@ -196,7 +202,6 @@ def online_read(
196202
table: FeatureView,
197203
entity_keys: List[EntityKeyProto],
198204
requested_features: Optional[List[str]] = None,
199-
sort_response: bool = True,
200205
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
201206
"""
202207
Retrieve feature values from the online DynamoDB store.
@@ -208,7 +213,6 @@ def online_read(
208213
config: The RepoConfig for the current FeatureStore.
209214
table: Feast FeatureView.
210215
entity_keys: a list of entity keys that should be read from the FeatureStore.
211-
sort_response: wether or not to sort DynamoDB responses by the entity_ids order.
212216
"""
213217
online_config = config.online_store
214218
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
@@ -219,36 +223,26 @@ def online_read(
219223

220224
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
221225
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]
222-
223-
len_entity_ids = len(entity_ids)
224-
batch_size = 40
225-
# Iterate until the end_index is the value len_entity_ids
226-
iters = (
227-
len_entity_ids // batch_size + 1
228-
if len_entity_ids % batch_size > 0
229-
else len_entity_ids // batch_size
230-
)
231-
for i in range(iters):
232-
start_index = min(i * batch_size, len_entity_ids)
233-
end_index = min(i * batch_size + batch_size, len_entity_ids)
234-
226+
227+
batch_size = self._batch_size
228+
sort_response = online_config.sort_response
229+
entity_ids_iter = iter(entity_ids)
230+
while True:
231+
batch = list(itertools.islice(entity_ids_iter, batch_size))
232+
# No more items to insert
233+
if len(batch) == 0:
234+
break
235235
batch_entity_ids = {
236236
table_instance.name: {
237-
"Keys": [
238-
{"entity_id": entity_id}
239-
for entity_id in entity_ids[start_index:end_index]
240-
]
237+
"Keys": [{"entity_id": entity_id} for entity_id in batch]
241238
}
242239
}
243-
244240
with tracing_span(name="remote_call"):
245241
response = dynamodb_resource.batch_get_item(
246242
RequestItems=batch_entity_ids
247243
)
248-
249244
response = response.get("Responses")
250245
table_responses = response.get(table_instance.name)
251-
252246
if table_responses:
253247
if sort_response:
254248
table_responses = self._sort_dynamodb_response(

0 commit comments

Comments
 (0)