Skip to content

Commit eaaa872

Browse files
authored
chore: Support consistent reads in dynamodb (feast-dev#3119)
* chore: Support consistent reads in dynamodb Signed-off-by: Achal Shah <achals@gmail.com> * fix Signed-off-by: Achal Shah <achals@gmail.com> Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 9f7e557 commit eaaa872

3 files changed

Lines changed: 13 additions & 6 deletions

File tree

sdk/python/feast/infra/materialization/lambda/lambda_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def _materialize_one(
227227

228228
logger.info(
229229
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
230-
f"rows written: {output['written_rows']}"
230+
f"Output: {output}"
231231
)
232232

233233
for f in not_done:

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from datetime import datetime
1717
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
1818

19-
from pydantic import StrictStr
19+
from pydantic import StrictBool, StrictStr
2020
from pydantic.typing import Literal, Union
2121

2222
from feast import Entity, FeatureView, utils
@@ -63,6 +63,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
6363
table_name_template: StrictStr = "{project}.{table_name}"
6464
"""DynamoDB table name template"""
6565

66+
consistent_reads: StrictBool = False
67+
"""Whether to read from Dynamodb by forcing consistent reads"""
68+
6669

6770
class DynamoDBOnlineStore(OnlineStore):
6871
"""
@@ -237,12 +240,12 @@ def online_read(
237240
batch_entity_ids = {
238241
table_instance.name: {
239242
"Keys": [{"entity_id": entity_id} for entity_id in batch],
240-
"ConsistentRead": True,
243+
"ConsistentRead": online_config.consistent_reads,
241244
}
242245
}
243246
with tracing_span(name="remote_call"):
244247
response = dynamodb_resource.batch_get_item(
245-
RequestItems=batch_entity_ids
248+
RequestItems=batch_entity_ids,
246249
)
247250
response = response.get("Responses")
248251
table_responses = response.get(table_instance.name)

sdk/python/tests/integration/materialization/test_lambda.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@
2424
def test_lambda_materialization_consistency():
2525
lambda_config = IntegrationTestRepoConfig(
2626
provider="aws",
27-
online_store={"type": "dynamodb", "region": "us-west-2"},
27+
online_store={
28+
"type": "dynamodb",
29+
"region": "us-west-2",
30+
"consistent_reads": True,
31+
},
2832
offline_store_creator=RedshiftDataSourceCreator,
2933
batch_engine={
3034
"type": "lambda",
31-
"materialization_image": "402087665549.dkr.ecr.us-west-2.amazonaws.com/feast-lambda-consumer:v1",
35+
"materialization_image": "402087665549.dkr.ecr.us-west-2.amazonaws.com/feast-lambda-consumer:v2",
3236
"lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role",
3337
},
3438
registry_location=RegistryLocation.S3,

0 commit comments

Comments
 (0)