Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address the rest of the comments
Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze committed Jul 2, 2021
commit 57a607cd5c52a3ddc1e799eb24d508c921d361df
65 changes: 41 additions & 24 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ class DynamoDBOnlineStore(OnlineStore):
Online feature store for AWS DynamoDB.
"""

def _initialize_dynamodb(self, online_config: DynamoDBOnlineStoreConfig):
return (
boto3.client("dynamodb", region_name=online_config.region),
boto3.resource("dynamodb", region_name=online_config.region),
)

def update(
self,
config: RepoConfig,
Expand All @@ -65,11 +59,11 @@ def update(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
client, dynamodb = self._initialize_dynamodb(online_config)
dynamodb_client, dynamodb_resource = self._initialize_dynamodb(online_config)

for table_instance in tables_to_keep:
try:
table = dynamodb.create_table(
dynamodb_resource.create_table(
TableName=f"{config.project}.{table_instance.name}",
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
Expand All @@ -78,18 +72,18 @@ def update(
BillingMode="PAY_PER_REQUEST",
)
except ClientError as ce:
print(ce)
if ce.response["Error"]["Code"] == "ResourceNotFoundException":
table = dynamodb.Table(f"{config.project}.{table_instance.name}")
# If the table creation fails with ResourceInUseException,
# it means the table already exists or is being created.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceInUseException":
raise

for table_instance in tables_to_keep:
client.get_waiter("table_exists").wait(
dynamodb_client.get_waiter("table_exists").wait(
TableName=f"{config.project}.{table_instance.name}"
)

for table_instance in tables_to_delete:
table = dynamodb.Table(f"{config.project}.{table_instance.name}")
table.delete()
self._delete_tables_idempotent(dynamodb_resource, config, tables_to_delete)

def teardown(
self,
Expand All @@ -99,11 +93,9 @@ def teardown(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb = self._initialize_dynamodb(online_config)
_, dynamodb_resource = self._initialize_dynamodb(online_config)

for table_instance in tables:
table = dynamodb.Table(f"{config.project}.{table_instance.name}")
table.delete()
self._delete_tables_idempotent(dynamodb_resource, config, tables)

def online_write_batch(
self,
Expand All @@ -116,9 +108,9 @@ def online_write_batch(
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb = self._initialize_dynamodb(online_config)
_, dynamodb_resource = self._initialize_dynamodb(online_config)

table_instance = dynamodb.Table(f"{config.project}.{table.name}")
table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
with table_instance.batch_writer() as batch:
for entity_key, features, timestamp, created_ts in data:
Comment thread
leonid133 marked this conversation as resolved.
entity_id = compute_entity_id(entity_key)
Expand All @@ -144,12 +136,12 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb = self._initialize_dynamodb(online_config)
_, dynamodb_resource = self._initialize_dynamodb(online_config)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instance = dynamodb.Table(f"{config.project}.{table.name}")
entity_id = compute_entity_id(entity_key) # TODO check id
table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
entity_id = compute_entity_id(entity_key)
response = table_instance.get_item(Key={"entity_id": entity_id})
value = response.get("Item")

Expand All @@ -163,3 +155,28 @@ def online_read(
else:
result.append((None, None))
return result

def _initialize_dynamodb(self, online_config: DynamoDBOnlineStoreConfig):
return (
boto3.client("dynamodb", region_name=online_config.region),
boto3.resource("dynamodb", region_name=online_config.region),
)

def _delete_tables_idempotent(
self,
dynamodb_resource,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
):
for table_instance in tables:
try:
table = dynamodb_resource.Table(
f"{config.project}.{table_instance.name}"
)
table.delete()
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
12 changes: 6 additions & 6 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,33 +564,33 @@ def get_registry_proto(self):
file_obj = TemporaryFile()
registry_proto = RegistryProto()
try:
import botocore
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))
try:
bucket = self.s3_client.Bucket(self._bucket)
self.s3_client.meta.client.head_bucket(Bucket=bucket.name)
except botocore.client.ClientError as e:
except ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the bucket does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
raise S3RegistryBucketNotExist(self._bucket)
else:
raise S3RegistryBucketForbiddenAccess(self._bucket)
raise S3RegistryBucketForbiddenAccess(self._bucket) from e

try:
obj = bucket.Object(self._key)
obj.download_fileobj(file_obj)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
except botocore.exceptions.ClientError as e:
except ClientError as e:
raise FileNotFoundError(
f'Error while trying to locate Registry at path "{self._uri.geturl()}"with [original error]: {e.response}'
)
f"Error while trying to locate Registry at path {self._uri.geturl()}"
) from e

def update_registry_proto(
self, updater: Optional[Callable[[RegistryProto], RegistryProto]] = None
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _validate_offline_store_config(cls, values):
elif values["provider"] == "gcp":
values["offline_store"]["type"] = "bigquery"
elif values["provider"] == "aws":
values["offline_store"]["type"] = "redshift"
values["offline_store"]["type"] = "file"

offline_store_type = values["offline_store"]["type"]

Expand Down
5 changes: 5 additions & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
"redis-py-cluster==2.1.2",
]

AWS_REQUIRED = [
"boto3==1.17.*",
Comment thread
tsotnet marked this conversation as resolved.
]

CI_REQUIRED = [
"cryptography==3.3.2",
"flake8",
Expand Down Expand Up @@ -200,6 +204,7 @@ def run(self):
"dev": ["mypy-protobuf==1.*", "grpcio-testing==1.*"],
"ci": CI_REQUIRED,
"gcp": GCP_REQUIRED,
"aws": AWS_REQUIRED,
"redis": REDIS_REQUIRED,
},
include_package_data=True,
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/tests/test_cli_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ def test_basic() -> None:
online_store:
type: dynamodb
region: us-west-2
offline_store:
type: file
"""
)
)
Expand Down
9 changes: 1 addition & 8 deletions sdk/python/tests/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,9 @@ def feature_store_with_gcs_registry():

@pytest.fixture
def feature_store_with_s3_registry():
Comment thread
tsotnet marked this conversation as resolved.
import boto3

s3 = boto3.resource("s3")
bucket_name = "feast-integration-tests"
bucket = s3.Bucket(bucket_name)
s3.meta.client.head_bucket(Bucket=bucket.name)

return FeatureStore(
config=RepoConfig(
registry=f"s3://{bucket_name}/registries/{int(time.time() * 1000)}/registry.db",
registry=f"s3://feast-integration-tests/registries/{int(time.time() * 1000)}/registry.db",
project="default",
provider="aws",
online_store=DynamoDBOnlineStoreConfig(region="us-west-2"),
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/test_offline_online_store_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
join_key="driver_id",
value_type=ValueType.INT32,
)
with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory():
with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}",
Expand Down Expand Up @@ -206,7 +206,7 @@ def prep_dynamodb_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
join_key="driver_id",
value_type=ValueType.INT32,
)
with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory():
with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}",
Expand Down