Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
…ure/online_dynamodb

Signed-off-by: lblokhin <lenin133@yandex.ru>
  • Loading branch information
leonid133 committed Jun 23, 2021
commit 5d33a790e1b7c8be6dc704afae54f6e89db5890e
136 changes: 136 additions & 0 deletions sdk/python/feast/infra/aws_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from datetime import datetime
Comment thread
leonid133 marked this conversation as resolved.
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import FeatureTable, utils
Comment thread
tsotnet marked this conversation as resolved.
Outdated
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig


class AwsProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

return result

def materialize_single_feature_view(
self,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

Comment thread
leonid133 marked this conversation as resolved.
Outdated
table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
Comment thread
leonid133 marked this conversation as resolved.
Outdated

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
# TODO implement me
pass
Comment thread
tsotnet marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -1,73 +1,53 @@
import os
# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
Comment thread
tsotnet marked this conversation as resolved.
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
import pandas
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError
raise FeastExtrasDependencyImportError("aws", str(e))
from tqdm import tqdm

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast import Entity, FeatureTable, FeatureView, utils
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import DynamoDbOnlineStoreConfig, RepoConfig

try:
import boto3
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

class AwsDynamodbProvider(Provider):
_wcu: int
_rcu: int

def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DynamoDbOnlineStoreConfig)
if config and config.online_store and config.online_store.rcu:
self._rcu = config.online_store.rcu
else:
self._rcu = 5

if config and config.online_store and config.online_store.wcu:
self._wcu = config.online_store.wcu
else:
self._wcu = 5
raise FeastExtrasDependencyImportError("aws", str(e))

if config and config.online_store and config.online_store.region_name:
self.region_name = config.online_store.region_name
os.environ["AWS_DEFAULT_REGION"] = self.region_name
else:
self.region_name = os.environ.get("AWS_DEFAULT_REGION", "us-west-2")
os.environ["AWS_DEFAULT_REGION"] = self.region_name
assert config.offline_store is not None
self.offline_store = get_offline_store_from_config(config.offline_store)

def _initialize_dynamodb(self):
return boto3.resource("dynamodb", region_name=self.region_name)
class DynamodbOnlineStore(OnlineStore):
def _initialize_dynamodb(self, online_config: DynamoDbOnlineStoreConfig):
return boto3.resource("dynamodb", region_name=online_config.region_name)
Comment thread
tsotnet marked this conversation as resolved.
Outdated

def update_infra(
def update(
self,
project: str,
config: RepoConfig,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
dynamodb = self._initialize_dynamodb()
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

for table_name in tables_to_keep:
Comment thread
leonid133 marked this conversation as resolved.
Outdated
table = None
Comment thread
leonid133 marked this conversation as resolved.
Outdated
Expand All @@ -83,8 +63,8 @@ def update_infra(
{"AttributeName": "Project", "AttributeType": "S"},
],
ProvisionedThroughput={
"ReadCapacityUnits": self._rcu,
"WriteCapacityUnits": self._wcu,
"ReadCapacityUnits": online_config.rcu,
"WriteCapacityUnits": online_config.wcu,
},
Comment thread
leonid133 marked this conversation as resolved.
Outdated
)
table.meta.client.get_waiter("table_exists").wait(
Expand All @@ -99,28 +79,32 @@ def update_infra(
table = dynamodb.Table(table_name.name)
table.delete()

def teardown_infra(
def teardown(
self,
project: str,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
dynamodb = self._initialize_dynamodb()
):
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

for table_name in tables:
table = dynamodb.Table(table_name)
table.delete()

def online_write_batch(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
dynamodb = self._initialize_dynamodb()
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

table_instance = dynamodb.Table(table.name)
with table_instance.batch_writer() as batch:
Expand All @@ -130,7 +114,7 @@ def online_write_batch(
batch.put_item(
Item={
"Row": document_id, # PartitionKey
Comment thread
tsotnet marked this conversation as resolved.
Outdated
"Project": project, # SortKey
"Project": config.project, # SortKey
"event_ts": str(utils.make_tzaware(timestamp)),
"values": {
k: v.SerializeToString()
Expand All @@ -141,18 +125,21 @@ def online_write_batch(

def online_read(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
dynamodb = self._initialize_dynamodb()
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instace = dynamodb.Table(table.name)
document_id = compute_datastore_entity_id(entity_key) # TODO check id
Comment thread
leonid133 marked this conversation as resolved.
Outdated
response = table_instace.get_item(
Key={"Row": document_id, "Project": project}
Key={"Row": document_id, "Project": config.project}
)
value = response["Item"]
Comment thread
tsotnet marked this conversation as resolved.
Outdated

Expand All @@ -167,65 +154,6 @@ def online_read(
result.append((None, None))
return result

def materialize_single_feature_view(
self,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
# TODO implement me
pass


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
Comment thread
leonid133 marked this conversation as resolved.
Outdated
"""
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.