Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge latest from master
Signed-off-by: lblokhin <lenin133@yandex.ru>
  • Loading branch information
leonid133 committed Jun 23, 2021
commit 24c44ee77fbe466e49391bb1c8892fe8902b7587
29 changes: 25 additions & 4 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, utils
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import DynamoDbOnlineStoreConfig, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig

try:
import boto3
Expand All @@ -32,7 +34,23 @@
raise FeastExtrasDependencyImportError("aws", str(e))


class DynamodbOnlineStore(OnlineStore):
class DynamoDbOnlineStoreConfig(FeastConfigBaseModel):
Comment thread
tsotnet marked this conversation as resolved.
Outdated
"""Online store config for DynamoDB store"""

type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""

rcu: Optional[PositiveInt] = 5
""" Read capacity unit """

wcu: Optional[PositiveInt] = 5
""" Write capacity unit """
Comment thread
leonid133 marked this conversation as resolved.
Outdated

region_name: Optional[StrictStr] = None
""" AWS Region Name """
Comment thread
leonid133 marked this conversation as resolved.
Outdated


class DynamoDbOnlineStore(OnlineStore):
Comment thread
leonid133 marked this conversation as resolved.
Outdated
def _initialize_dynamodb(self, online_config: DynamoDbOnlineStoreConfig):
return boto3.resource("dynamodb", region_name=online_config.region_name)

Expand Down Expand Up @@ -90,8 +108,11 @@ def teardown(
dynamodb = self._initialize_dynamodb(online_config)

for table_name in tables:
table = dynamodb.Table(table_name)
table.delete()
try:
table = dynamodb.Table(table_name)
Comment thread
leonid133 marked this conversation as resolved.
Outdated
table.delete()
except Exception as e:
print(str(e))
Comment thread
leonid133 marked this conversation as resolved.
Outdated

def online_write_batch(
self,
Expand Down
58 changes: 10 additions & 48 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.repo_config import (
DatastoreOnlineStoreConfig,
DynamoDbOnlineStoreConfig,
OnlineStoreConfig,
RedisOnlineStoreConfig,
SqliteOnlineStoreConfig,
)


def get_online_store_from_config(online_store_config: Any,) -> OnlineStore:
Expand All @@ -31,47 +24,16 @@ def get_online_store_from_config(online_store_config: Any,) -> OnlineStore:
# So we should include the original error as well in the stack trace.
raise errors.FeastModuleImportError(module_name, "OnlineStore") from e

return SqliteOnlineStore()
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
from feast.infra.online_stores.datastore import DatastoreOnlineStore

return DatastoreOnlineStore()
elif isinstance(online_store_config, RedisOnlineStoreConfig):
from feast.infra.online_stores.redis import RedisOnlineStore

return RedisOnlineStore()
elif isinstance(online_store_config, DynamoDbOnlineStoreConfig):
from feast.infra.online_stores.dynamodb import DynamodbOnlineStore

return DynamodbOnlineStore()
raise ValueError(f"Unsupported offline store config '{online_store_config}'")


SUPPORTED_SOURCES: Dict[Any, Set[Any]] = {
SqliteOnlineStoreConfig: {FileSource},
DatastoreOnlineStoreConfig: {BigQuerySource},
RedisOnlineStoreConfig: {FileSource, BigQuerySource},
DynamoDbOnlineStoreConfig: {FileSource, BigQuerySource},
}


def assert_online_store_supports_data_source(
online_store_config: OnlineStoreConfig, data_source: DataSource
):
supported_sources: Set[Any] = SUPPORTED_SOURCES.get(
online_store_config.__class__, set()
)
# This is needed because checking for `in` with Union types breaks mypy.
# https://github.com/python/mypy/issues/4954
# We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]`
# Once ^ is resolved.
if supported_sources:
for source in supported_sources:
if source == data_source.__class__:
return
raise FeastOnlineStoreUnsupportedDataSource(
online_store_config.type, data_source.__class__.__name__
)
# Try getting the provider class definition
try:
online_store_class = getattr(module, store_class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastClassImportError(
module_name, store_class_name, class_type="OnlineStore"
) from None
return online_store_class()


def _redis_key(project: str, entity_key: EntityKeyProto):
Expand Down
114 changes: 6 additions & 108 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore",
"datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore",
"redis": "feast.infra.online_stores.redis.RedisOnlineStore",
"dynamodb": "feast.infra.online_stores.dynamodb.DynamoDbOnlineStore",
Comment thread
tsotnet marked this conversation as resolved.
Outdated
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand All @@ -40,98 +41,6 @@ class Config:
extra = "forbid"


class SqliteOnlineStoreConfig(FeastBaseModel):
""" Online store config for local (SQLite-based) store """

type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """


class DatastoreOnlineStoreConfig(FeastBaseModel):
""" Online store config for GCP Datastore """

type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""


class RedisType(str, Enum):
redis = "redis"
redis_cluster = "redis_cluster"


class RedisOnlineStoreConfig(FeastBaseModel):
"""Online store config for Redis store"""

type: Literal["redis"] = "redis"
"""Online store type selector"""

redis_type: RedisType = RedisType.redis
"""Redis type: redis or redis_cluster"""

connection_string: StrictStr = "localhost:6379"
"""Connection string containing the host, port, and configuration parameters for Redis
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """


class DynamoDbOnlineStoreConfig(FeastBaseModel):
"""Online store config for DynamoDB store"""

type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""

rcu: Optional[PositiveInt] = 5
""" Read capacity unit """

wcu: Optional[PositiveInt] = 5
""" Write capacity unit """

region_name: Optional[StrictStr] = None
""" AWS Region Name """


OnlineStoreConfig = Union[
DatastoreOnlineStoreConfig,
SqliteOnlineStoreConfig,
RedisOnlineStoreConfig,
DynamoDbOnlineStoreConfig,
]


class FileOfflineStoreConfig(FeastBaseModel):
""" Offline store config for local (file-based) store """

type: Literal["file"] = "file"
""" Offline store type selector"""


class BigQueryOfflineStoreConfig(FeastBaseModel):
""" Offline store config for GCP BigQuery """

type: Literal["bigquery"] = "bigquery"
""" Offline store type selector"""

dataset: StrictStr = "feast"
""" (optional) BigQuery Dataset name for temporary tables """


OfflineStoreConfig = Union[FileOfflineStoreConfig, BigQueryOfflineStoreConfig]


class RegistryConfig(FeastBaseModel):
""" Metadata Store Configuration. Configuration that relates to reading from and writing to the Feast registry."""

Expand All @@ -158,7 +67,7 @@ class RepoConfig(FeastBaseModel):
"""

provider: StrictStr
""" str: local or gcp or redis or aws """
""" str: local or gcp or aws """

online_store: Any
""" OnlineStoreConfig: Online store configuration (optional depending on provider) """
Expand Down Expand Up @@ -218,21 +127,10 @@ def _validate_online_store_config(cls, values):

online_store_type = values["online_store"]["type"]

# Make sure the user hasn't provided the wrong type
assert online_store_type in ["datastore", "sqlite", "redis", "dynamodb"]

# Validate the dict to ensure one of the union types match
try:
if online_store_type == "sqlite":
SqliteOnlineStoreConfig(**values["online_store"])
elif online_store_type == "datastore":
DatastoreOnlineStoreConfig(**values["online_store"])
elif online_store_type == "redis":
RedisOnlineStoreConfig(**values["online_store"])
elif online_store_type == "dynamodb":
DynamoDbOnlineStoreConfig(**values["online_store"])
else:
raise ValueError(f"Invalid online store type {online_store_type}")
online_config_class = get_online_config_from_type(online_store_type)
online_config_class(**values["online_store"])
except ValidationError as e:
raise ValidationError(
[ErrorWrapper(e, loc="online_store")], model=RepoConfig,
Expand All @@ -257,10 +155,10 @@ def _validate_offline_store_config(cls, values):
if "type" not in values["offline_store"]:
if values["provider"] == "local":
values["offline_store"]["type"] = "file"
elif values["provider"] == "aws":
values["offline_store"]["type"] = "file"
elif values["provider"] == "gcp":
values["offline_store"]["type"] = "bigquery"
elif values["provider"] == "aws":
values["offline_store"]["type"] = "file"

offline_store_type = values["offline_store"]["type"]

Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.