Skip to content

Commit a2e9209

Browse files
authored
fix: Dynamodb drops missing entities when batching (#2802)
* fixes dynamodb batch dropping missing entities Signed-off-by: Andrew Pope <apope@nursefly.com> * add unit tests Signed-off-by: Andrew Pope <apope@nursefly.com>
1 parent 1808e06 commit a2e9209

2 files changed

Lines changed: 49 additions & 4 deletions

File tree

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ def online_read(
221221
entity_ids_iter = iter(entity_ids)
222222
while True:
223223
batch = list(itertools.islice(entity_ids_iter, batch_size))
224+
batch_result: List[
225+
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
226+
] = []
224227
# No more items to insert
225228
if len(batch) == 0:
226229
break
@@ -243,20 +246,23 @@ def online_read(
243246
for tbl_res in table_responses:
244247
entity_id = tbl_res["entity_id"]
245248
while entity_id != batch[entity_idx]:
246-
result.append((None, None))
249+
batch_result.append((None, None))
247250
entity_idx += 1
248251
res = {}
249252
for feature_name, value_bin in tbl_res["values"].items():
250253
val = ValueProto()
251254
val.ParseFromString(value_bin.value)
252255
res[feature_name] = val
253-
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
256+
batch_result.append(
257+
(datetime.fromisoformat(tbl_res["event_ts"]), res)
258+
)
254259
entity_idx += 1
255260

256261
# Not all entities in a batch may have responses
257262
# Pad with remaining values in batch that were not found
258-
batch_size_nones = ((None, None),) * (len(batch) - len(result))
259-
result.extend(batch_size_nones)
263+
batch_size_nones = ((None, None),) * (len(batch) - len(batch_result))
264+
batch_result.extend(batch_size_nones)
265+
result.extend(batch_result)
260266
return result
261267

262268
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,3 +318,42 @@ def test_write_batch_non_duplicates(repo_config, dynamodb_online_store):
318318
returned_items = response.get("Items", None)
319319
assert returned_items is not None
320320
assert len(returned_items) == len(data)
321+
322+
323+
@mock_dynamodb2
324+
def test_dynamodb_online_store_online_read_unknown_entity_end_of_batch(
325+
repo_config, dynamodb_online_store
326+
):
327+
"""
328+
Test DynamoDBOnlineStore online_read method with unknown entities at
329+
the end of the batch.
330+
"""
331+
batch_size = repo_config.online_store.batch_size
332+
n_samples = batch_size
333+
_create_test_table(PROJECT, f"{TABLE_NAME}_unknown_entity_{n_samples}", REGION)
334+
data = _create_n_customer_test_samples(n=n_samples)
335+
_insert_data_test_table(
336+
data, PROJECT, f"{TABLE_NAME}_unknown_entity_{n_samples}", REGION
337+
)
338+
339+
entity_keys, features, *rest = zip(*data)
340+
entity_keys = list(entity_keys)
341+
features = list(features)
342+
343+
# Append a nonsensical entity to search for as the only item in the 2nd batch
344+
entity_keys.append(
345+
EntityKeyProto(
346+
join_keys=["customer"], entity_values=[ValueProto(string_val="12359")]
347+
)
348+
)
349+
features.append(None)
350+
351+
returned_items = dynamodb_online_store.online_read(
352+
config=repo_config,
353+
table=MockFeatureView(name=f"{TABLE_NAME}_unknown_entity_{n_samples}"),
354+
entity_keys=entity_keys,
355+
)
356+
357+
# ensure the entity is not dropped
358+
assert len(returned_items) == len(entity_keys)
359+
assert returned_items[-1] == (None, None)

0 commit comments

Comments
 (0)