Skip to content

Commit ece23e1

Browse files
committed
add async writer for dynamo
Signed-off-by: Rob Howley <howley.robert@gmail.com>
1 parent 87d59fe commit ece23e1

File tree

29 files changed

+684
-176
lines changed

29 files changed

+684
-176
lines changed

CODEOWNERS

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
# See https://help.github.com/articles/about-codeowners/
22
# for more info about CODEOWNERS file
33

4+
/docs/ @feast-dev/reviewers-and-approvers
5+
/examples/ @feast-dev/reviewers-and-approvers
6+
/go/ @feast-dev/reviewers-and-approvers
7+
/infra/ @feast-dev/reviewers-and-approvers
8+
/java/ @feast-dev/reviewers-and-approvers
9+
/protos/ @feast-dev/reviewers-and-approvers
10+
/sdk/ @feast-dev/reviewers-and-approvers
11+
/ui/ @feast-dev/reviewers-and-approvers
12+
413
# Core Interfaces
514
/sdk/python/feast/infra/offline_stores/offline_store.py @feast-dev/maintainers
615
/sdk/python/feast/infra/online_stores/online_store.py @feast-dev/maintainers

OWNERS

Lines changed: 0 additions & 47 deletions
This file was deleted.

sdk/python/feast/entity.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,20 @@ def __init__(
9999
self.created_timestamp = None
100100
self.last_updated_timestamp = None
101101

102+
def __repr__(self):
103+
return (
104+
f"Entity(\n"
105+
f" name={self.name!r},\n"
106+
f" value_type={self.value_type!r},\n"
107+
f" join_key={self.join_key!r},\n"
108+
f" description={self.description!r},\n"
109+
f" tags={self.tags!r},\n"
110+
f" owner={self.owner!r},\n"
111+
f" created_timestamp={self.created_timestamp!r},\n"
112+
f" last_updated_timestamp={self.last_updated_timestamp!r}\n"
113+
f")"
114+
)
115+
102116
def __hash__(self) -> int:
103117
return hash((self.name, self.join_key))
104118

sdk/python/feast/feature.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,18 @@ def __lt__(self, other):
5858
return self.name < other.name
5959

6060
def __repr__(self):
61-
# return string representation of the reference
62-
return f"{self.name}-{self.dtype}"
61+
return (
62+
f"Feature(\n"
63+
f" name={self._name!r},\n"
64+
f" dtype={self._dtype!r},\n"
65+
f" description={self._description!r},\n"
66+
f" labels={self._labels!r}\n"
67+
f")"
68+
)
6369

6470
def __str__(self):
6571
# readable string of the reference
66-
return f"Feature<{self.__repr__()}>"
72+
return f"Feature<{self.name}: {self.dtype}>"
6773

6874
@property
6975
def name(self):

sdk/python/feast/feature_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ def async_refresh():
100100

101101
@asynccontextmanager
102102
async def lifespan(app: FastAPI):
103-
async_refresh()
104103
await store.initialize()
104+
async_refresh()
105105
yield
106106
stop_refresh()
107107
await store.close()

sdk/python/feast/feature_store.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,16 @@ def version(self) -> str:
175175
"""Returns the version of the current Feast SDK/CLI."""
176176
return get_version()
177177

178+
def __repr__(self) -> str:
179+
return (
180+
f"FeatureStore(\n"
181+
f" repo_path={self.repo_path!r},\n"
182+
f" config={self.config!r},\n"
183+
f" registry={self._registry!r},\n"
184+
f" provider={self._provider!r}\n"
185+
f")"
186+
)
187+
178188
@property
179189
def registry(self) -> BaseRegistry:
180190
"""Gets the registry of this feature store."""

sdk/python/feast/field.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ def __lt__(self, other):
8181
return self.name < other.name
8282

8383
def __repr__(self):
84-
return f"Field(name='{self.name}', dtype={self.dtype}, description='{self.description}' tags={self.tags})"
84+
return (
85+
f"Field(\n"
86+
f" name={self.name!r},\n"
87+
f" dtype={self.dtype!r},\n"
88+
f" description={self.description!r},\n"
89+
f" tags={self.tags!r}\n"
90+
f")"
91+
)
8592

8693
def __str__(self):
8794
return f"Field(name={self.name}, dtype={self.dtype}, tags={self.tags})"

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,3 @@ def get_table_column_names_and_types_from_data_source(
388388
data_source: DataSource object
389389
"""
390390
return data_source.get_table_column_names_and_types(config=config)
391-
392-
async def initialize(self, config: RepoConfig) -> None:
393-
pass
394-
395-
async def close(self) -> None:
396-
pass

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from feast.infra.online_stores.helpers import compute_entity_id
2727
from feast.infra.online_stores.online_store import OnlineStore
2828
from feast.infra.supported_async_methods import SupportedAsyncMethods
29+
from feast.infra.utils.aws_utils import dynamo_write_items_async
2930
from feast.protos.feast.core.DynamoDBTable_pb2 import (
3031
DynamoDBTable as DynamoDBTableProto,
3132
)
@@ -103,7 +104,7 @@ async def close(self):
103104

104105
@property
105106
def async_supported(self) -> SupportedAsyncMethods:
106-
return SupportedAsyncMethods(read=True)
107+
return SupportedAsyncMethods(read=True, write=True)
107108

108109
def update(
109110
self,
@@ -238,6 +239,42 @@ def online_write_batch(
238239
)
239240
self._write_batch_non_duplicates(table_instance, data, progress, config)
240241

242+
async def online_write_batch_async(
243+
self,
244+
config: RepoConfig,
245+
table: FeatureView,
246+
data: List[
247+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
248+
],
249+
progress: Optional[Callable[[int], Any]],
250+
) -> None:
251+
"""
252+
Writes a batch of feature rows to the online store asynchronously.
253+
254+
If a tz-naive timestamp is passed to this method, it is assumed to be UTC.
255+
256+
Args:
257+
config: The config for the current feature store.
258+
table: Feature view to which these feature rows correspond.
259+
data: A list of quadruplets containing feature data. Each quadruplet contains an entity
260+
key, a dict containing feature values, an event timestamp for the row, and the created
261+
timestamp for the row if it exists.
262+
progress: Function to be called once a batch of rows is written to the online store, used
263+
to show progress.
264+
"""
265+
online_config = config.online_store
266+
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
267+
268+
table_name = _get_table_name(online_config, config, table)
269+
items = [
270+
_to_write_item(config, entity_key, features, timestamp)
271+
for entity_key, features, timestamp, _ in data
272+
]
273+
client = _get_aiodynamodb_client(
274+
online_config.region, config.online_store.max_pool_connections
275+
)
276+
await dynamo_write_items_async(client, table_name, items)
277+
241278
def online_read(
242279
self,
243280
config: RepoConfig,
@@ -419,19 +456,8 @@ def _write_batch_non_duplicates(
419456
"""Deduplicate write batch request items on ``entity_id`` primary key."""
420457
with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch:
421458
for entity_key, features, timestamp, created_ts in data:
422-
entity_id = compute_entity_id(
423-
entity_key,
424-
entity_key_serialization_version=config.entity_key_serialization_version,
425-
)
426459
batch.put_item(
427-
Item={
428-
"entity_id": entity_id, # PartitionKey
429-
"event_ts": str(utils.make_tzaware(timestamp)),
430-
"values": {
431-
k: v.SerializeToString()
432-
for k, v in features.items() # Serialized Features
433-
},
434-
}
460+
Item=_to_write_item(config, entity_key, features, timestamp)
435461
)
436462
if progress:
437463
progress(1)
@@ -675,3 +701,18 @@ def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None
675701
region, endpoint_url
676702
)
677703
return self._dynamodb_resource
704+
705+
706+
def _to_write_item(config, entity_key, features, timestamp):
707+
entity_id = compute_entity_id(
708+
entity_key,
709+
entity_key_serialization_version=config.entity_key_serialization_version,
710+
)
711+
return {
712+
"entity_id": entity_id, # PartitionKey
713+
"event_ts": str(utils.make_tzaware(timestamp)),
714+
"values": {
715+
k: v.SerializeToString()
716+
for k, v in features.items() # Serialized Features
717+
},
718+
}

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,6 @@ def get_table_column_names_and_types_from_data_source(
521521

522522
async def initialize(self, config: RepoConfig) -> None:
523523
await self.online_store.initialize(config)
524-
await self.offline_store.initialize(config)
525524

526525
async def close(self) -> None:
527526
await self.online_store.close()
528-
await self.offline_store.close()

0 commit comments

Comments
 (0)