Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
actually get the test working correctly
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 14, 2022
commit f2927e9f9151f4dbedfd4a311c1387c123e87476
8 changes: 7 additions & 1 deletion sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,13 @@ def update_infra(
self._deploy_feature_server(project, image_uri)

if self.batch_engine:
self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep)
self.batch_engine.update(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
)

def _deploy_feature_server(self, project: str, image_uri: str):
_logger.info("Deploying feature server...")
Expand Down
12 changes: 8 additions & 4 deletions sdk/python/feast/infra/materialization/lambda/lambda_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class LambdaMaterializationEngine(BatchMaterializationEngine):
"""
WARNING: This engine should be considered "Alpha" functionality.
"""

def update(
self,
project: str,
Expand Down Expand Up @@ -101,10 +102,9 @@ def update(
logger.info("Creating lambda function %s, %s", self.lambda_name, r)

logger.info("Waiting for function %s to be active", self.lambda_name)
waiter = self.lambda_client.get_waiter('function_active')
waiter = self.lambda_client.get_waiter("function_active")
waiter.wait(FunctionName=self.lambda_name)


def teardown_infra(
self,
project: str,
Expand Down Expand Up @@ -204,8 +204,12 @@ def _materialize_one(
InvocationType="RequestResponse",
Payload=json.dumps(payload),
)
logger.info(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}")
print(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}")
logger.info(
f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}"
)
print(
f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}"
)

return LambdaMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def online_read(
batch_entity_ids = {
table_instance.name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": True
"ConsistentRead": True,
}
}
with tracing_span(name="remote_call"):
Expand Down
10 changes: 8 additions & 2 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig, BATCH_ENGINE_CLASS_FOR_TYPE
from feast.repo_config import BATCH_ENGINE_CLASS_FOR_TYPE, RepoConfig
from feast.saved_dataset import SavedDataset
from feast.stream_feature_view import StreamFeatureView
from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute
Expand Down Expand Up @@ -126,7 +126,13 @@ def update_infra(
partial=partial,
)
if self.batch_engine:
self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep)
self.batch_engine.update(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
)

def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ def batch_engine(self):
self._batch_engine_config["type"]
)(**self._batch_engine_config)
elif isinstance(self._online_config, str):
self._batch_engine = get_batch_engine_config_from_type(self._online_config)()
self._batch_engine = get_batch_engine_config_from_type(
self._online_config
)()
elif self._online_config:
self._batch_engine = self._batch_engine
Comment thread
achals marked this conversation as resolved.
Outdated

Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
logger = logging.getLogger(logger_name)
logger.setLevel(level)


def pytest_configure(config):
if platform in ["darwin", "windows"]:
multiprocessing.set_start_method("spawn")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from feast.infra.feature_servers.local_process.config import LocalFeatureServerConfig
from feast.repo_config import RegistryConfig, RepoConfig
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig, RegistryLocation,
IntegrationTestRepoConfig,
RegistryLocation,
)
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
Expand Down Expand Up @@ -381,7 +382,7 @@ def construct_test_environment(
online_creator = None
online_store = test_repo_config.online_store

if (test_repo_config.python_feature_server and test_repo_config.provider == "aws"):
if test_repo_config.python_feature_server and test_repo_config.provider == "aws":
from feast.infra.feature_servers.aws_lambda.config import (
AwsLambdaFeatureServerConfig,
)
Expand All @@ -397,7 +398,9 @@ def construct_test_environment(
)

repo_dir_name = tempfile.mkdtemp()
if (test_repo_config.python_feature_server and test_repo_config.provider == "aws") or test_repo_config.registry_location == RegistryLocation.S3:
if (
test_repo_config.python_feature_server and test_repo_config.provider == "aws"
) or test_repo_config.registry_location == RegistryLocation.S3:
registry: Union[str, RegistryConfig] = (
f"s3://feast-integration-tests/registries/{project}/registry.db"
)
Expand Down
81 changes: 33 additions & 48 deletions sdk/python/tests/integration/materialization/test_lambda.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,54 @@
import time

import math
import time
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
import pytest
from pytz import utc

from feast import FeatureStore, FeatureView, Entity, ValueType, RedshiftSource, Feature
from feast import Entity, Feature, FeatureStore, FeatureView, ValueType
from tests.data.data_creator import create_basic_driver_dataset
from tests.integration.feature_repos.integration_test_repo_config import IntegrationTestRepoConfig, RegistryLocation
from tests.integration.feature_repos.repo_configuration import construct_test_environment
from tests.integration.feature_repos.universal.data_sources.redshift import RedshiftDataSourceCreator
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
RegistryLocation,
)
from tests.integration.feature_repos.repo_configuration import (
construct_test_environment,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)


@pytest.mark.integration
def test_lambda_materialization():
lambda_config = IntegrationTestRepoConfig(
provider="aws",
online_store={
"type": "dynamodb",
"region": "us-west-2",
},
online_store={"type": "dynamodb", "region": "us-west-2"},
offline_store_creator=RedshiftDataSourceCreator,
batch_engine={
"type": "lambda",
"materialization_image": "402087665549.dkr.ecr.us-west-2.amazonaws.com/feast-lambda-consumer:v1",
"lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role"
"lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role",
},
registry_location=RegistryLocation.S3
registry_location=RegistryLocation.S3,
)
lambda_environment = construct_test_environment(lambda_config, None)

# local_config = IntegrationTestRepoConfig(
# online_store={
# "type": "dynamodb",
# "region": "us-west-2",
# },
# offline_store_creator=RedshiftDataSourceCreator,
# batch_engine="local"
# )
# local_environment = construct_test_environment(local_config, None)

df = create_basic_driver_dataset()
ds = lambda_environment.data_source_creator.create_data_source(
df, lambda_environment.feature_store.project, field_mapping={"ts_1": "ts"},
)

fs = lambda_environment.feature_store
driver = Entity(
name="driver_id",
join_key="driver_id",
value_type=ValueType.INT64,
)
driver = Entity(name="driver_id", join_key="driver_id", value_type=ValueType.INT64,)

driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(weeks=52),
features=[
Feature(name="value", dtype=ValueType.FLOAT),
],
features=[Feature(name="value", dtype=ValueType.FLOAT)],
batch_source=ds,
)

Expand All @@ -85,13 +70,13 @@ def test_lambda_materialization():


def check_offline_and_online_features(
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
# Check online store
response_dict = fs.get_online_features(
Expand All @@ -105,15 +90,15 @@ def check_offline_and_online_features(
if expected_value:
assert response_dict[f"{fv.name}__value"][0], f"Response: {response_dict}"
assert (
abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
), f"Response: {response_dict}, Expected: {expected_value}"
else:
assert response_dict[f"{fv.name}__value"][0] is None
else:
if expected_value:
assert response_dict["value"][0], f"Response: {response_dict}"
assert (
abs(response_dict["value"][0] - expected_value) < 1e-6
abs(response_dict["value"][0] - expected_value) < 1e-6
), f"Response: {response_dict}, Expected: {expected_value}"
else:
assert response_dict["value"][0] is None
Expand All @@ -131,11 +116,11 @@ def check_offline_and_online_features(
if full_feature_names:
if expected_value:
assert (
abs(
df.to_dict(orient="list")[f"{fv.name}__value"][0]
- expected_value
)
< 1e-6
abs(
df.to_dict(orient="list")[f"{fv.name}__value"][0]
- expected_value
)
< 1e-6
)
else:
assert not df.to_dict(orient="list")[f"{fv.name}__value"] or math.isnan(
Expand All @@ -144,7 +129,7 @@ def check_offline_and_online_features(
else:
if expected_value:
assert (
abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6
abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6
)
else:
assert not df.to_dict(orient="list")["value"] or math.isnan(
Expand All @@ -153,7 +138,7 @@ def check_offline_and_online_features(


def run_offline_online_store_consistency_test(
fs: FeatureStore, fv: FeatureView, split_dt: datetime
fs: FeatureStore, fv: FeatureView, split_dt: datetime
) -> None:
now = datetime.utcnow()

Expand Down