Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
21e47d2
test
mavysavydav Jun 12, 2021
06e0c77
refactored existing tests to test full_feature_names feature on data …
Mwad22 Jun 16, 2021
4b7dd18
removed full_feature_names usage from quickstart and README to have m…
Mwad22 Jun 16, 2021
579e08f
Update CHANGELOG for Feast v0.10.8
Jun 17, 2021
462da43
GitBook: [master] 2 pages modified
achals Jun 17, 2021
df95ee8
Schema Inferencing should happen at apply time (#1646)
mavysavydav Jun 18, 2021
e383575
GitBook: [master] 80 pages modified
woop Jun 19, 2021
dd25ad6
GitBook: [master] 80 pages modified
woop Jun 20, 2021
cef2869
Provide descriptive error on invalid table reference (#1627)
codyjlin Jun 21, 2021
c2e2b4d
Refactor OnlineStoreConfig classes into owning modules (#1649)
achals Jun 21, 2021
d2cda24
Possibility to specify a project for BigQuery queries (#1656)
MattDelac Jun 21, 2021
4ab4c60
Refactor OfflineStoreConfig classes into their owning modules (#1657)
achals Jun 22, 2021
64a2cb5
Run python unit tests in parallel (#1652)
achals Jun 22, 2021
9e4c907
Rename telemetry to usage (#1660)
Jun 22, 2021
b951282
resolved final comments on PR (variable renaming, refactor tests)
Mwad22 Jun 23, 2021
a68b12b
reformatted after merge conflict
Mwad22 Jun 23, 2021
094dbf3
Update CHANGELOG for Feast v0.11.0
woop Jun 24, 2021
0a148f9
Update charts README (#1659)
szalai1 Jun 25, 2021
0ce8210
Added Redis to list of online stores for local provider in providers …
nels Jun 25, 2021
d71e4c5
Grouped inferencing statements together in apply methods for easier r…
mavysavydav Jun 25, 2021
c14023f
Add RedshiftDataSource (#1669)
Jun 28, 2021
d138648
Provide the user with more options for setting the to_bigquery config…
codyjlin Jun 28, 2021
c02b9eb
Add streaming sources to the FeatureView API (#1664)
achals Jun 28, 2021
12dbbea
Add to_table() to RetrievalJob object (#1663)
MattDelac Jun 29, 2021
d0fe0a9
Rename to_table to to_arrow (#1671)
MattDelac Jun 29, 2021
6e8670e
Cancel BigQuery job if timeout hits (#1672)
MattDelac Jun 29, 2021
5314024
Fix Feature References example (#1674)
GregKuhlmann Jun 30, 2021
eb1da5e
Allow strings for online/offline store instead of dicts (#1673)
achals Jun 30, 2021
183a0b9
Remove default list from the FeatureView constructor (#1679)
achals Jul 1, 2021
b714a12
made changes requested by @tsotnet
Mwad22 Jul 2, 2021
c78894f
Fix unit tests that got broken by Pandas 1.3.0 release (#1683)
Jul 3, 2021
20c9461
Add support for DynamoDB and S3 registry (#1483)
leonid133 Jul 3, 2021
d36d1a0
Parallelize integration tests (#1684)
Jul 4, 2021
651bce3
BQ exception should be raised first before we check the timedout (#1675)
MattDelac Jul 5, 2021
f3b92c3
Update sdk/python/feast/infra/provider.py
Mwad22 Jul 5, 2021
f400d65
Update sdk/python/feast/feature_store.py
Mwad22 Jul 5, 2021
082fca7
made error logic/messages more descriptive
Mwad22 Jul 5, 2021
3aca976
made error logic/messages more descriptive.
Mwad22 Jul 5, 2021
79aa736
Simplified error messages
Mwad22 Jul 6, 2021
d7d08ef
ran formatter, issue in errors.py
Mwad22 Jul 7, 2021
2ab8eea
Merge branch 'master' into mwad22-1618-PR
Mwad22 Jul 7, 2021
650340d
python linter issues resolved
Mwad22 Jul 7, 2021
5d582a6
removed unnecessary default assignment in get_historical_features. de…
Mwad22 Jul 8, 2021
8724e0b
added error message assertion for feature name collisions, and other …
Mwad22 Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add support for DynamoDB and S3 registry (#1483)
* Add support for DynamoDB and S3 registry

Signed-off-by: lblokhin <lenin133@yandex.ru>

* rcu and wcu as a parameter of dynamodb online store

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix linter

Signed-off-by: lblokhin <lenin133@yandex.ru>

* aws dependency to extras

Signed-off-by: lblokhin <lenin133@yandex.ru>

* FEAST_S3_ENDPOINT_URL

Signed-off-by: lblokhin <lenin133@yandex.ru>

* tests

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix signature, after merge

Signed-off-by: lblokhin <lenin133@yandex.ru>

* aws default region name configurable

Signed-off-by: lblokhin <lenin133@yandex.ru>

* add offlinestore config type to test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* review changes

Signed-off-by: lblokhin <lenin133@yandex.ru>

* review requested changes

Signed-off-by: lblokhin <lenin133@yandex.ru>

* integration test for Dynamo

Signed-off-by: lblokhin <lenin133@yandex.ru>

* change the rest of table_name to table_instance (where table_name is actually an instance of DynamoDB Table object)

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix DynamoDBOnlineStore commit

Signed-off-by: lblokhin <lenin133@yandex.ru>

* move client to _initialize_dynamodb

Signed-off-by: lblokhin <lenin133@yandex.ru>

* rename document_id to entity_id and Row to entity_id

Signed-off-by: lblokhin <lenin133@yandex.ru>

* The default value is None

Signed-off-by: lblokhin <lenin133@yandex.ru>

* Remove Datastore from the docstring.

Signed-off-by: lblokhin <lenin133@yandex.ru>

* get rid of the return call from S3RegistryStore

Signed-off-by: lblokhin <lenin133@yandex.ru>

* merge two exceptions

Signed-off-by: lblokhin <lenin133@yandex.ru>

* For ci requirement

Signed-off-by: lblokhin <lenin133@yandex.ru>

* remove configuration from test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* feast-integration-tests for tests

Signed-off-by: lblokhin <lenin133@yandex.ru>

* change test path

Signed-off-by: lblokhin <lenin133@yandex.ru>

* add fixture feature_store_with_s3_registry to test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* region required

Signed-off-by: lblokhin <lenin133@yandex.ru>

* Address the rest of the comments

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Update to_table to to_arrow

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

Co-authored-by: Tsotne Tabidze <tsotne@tecton.ai>
Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
  • Loading branch information
2 people authored and Mwad22 committed Jul 7, 2021
commit 20c94613b677aef66ab39aac277b23bdef3ea06d
Binary file added docs/specs/dynamodb_online_example.monopic
Binary file not shown.
Binary file added docs/specs/dynamodb_online_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/specs/online_store_format.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This format is considered part of Feast public API contract; that allows other c

The format is not entirely technology or cloud agnostic. Since users may opt to use different key-value stores as an underlying engine to store feature data, and we don't want to aim for the lowest common denominator across them, we have to provide different "flavors" of this data format, specialized for every supported store.

This version of the Online Store Format supports only Redis as the underlying storage engine. We envision adding more storage engines to this document in the future.
This version of the Online Store Format supports Redis and DynamoDB as storage engine. We envision adding more storage engines to this document in the future.


## Overview
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ def __init__(self, name, project=None):
super().__init__(f"Feature table {name} does not exist")


class S3RegistryBucketNotExist(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist")


class S3RegistryBucketForbiddenAccess(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
141 changes: 141 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig


class AwsProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

return result

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
)
return job
17 changes: 3 additions & 14 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -191,7 +190,7 @@ def _write_minibatch(
):
entities = []
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)

key = client.key(
"Project", project, "Table", table.name, "Row", document_id,
Expand Down Expand Up @@ -236,7 +235,7 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)
key = client.key(
"Project", feast_project, "Table", table.name, "Row", document_id
)
Expand All @@ -253,16 +252,6 @@ def online_read(
return result


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
Expand Down
Loading