Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Respond to PR feedback
- Add integration test, by factoring out shared consistency test.
- Make the number of Pods dynamic, based on the number of .parquet
  file paths.
- Add instructions for creating a bytewax test cluster for
  integration testing.

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
  • Loading branch information
whoahbot committed Aug 15, 2022
commit 6272a19a6a86bc7837241ae37ece77827cf88220
6 changes: 3 additions & 3 deletions docs/reference/batch-materialization/bytewax.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Then configure them in the batch_engine section of `feature_store.yaml`:
batch_engine:
type: bytewax
namespace: bytewax
pods: 3
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
Expand All @@ -51,9 +50,10 @@ The Bytewax materialization engine is configured through the The `feature_store.
batch_engine:
type: bytewax
namespace: bytewax
pods: 3
image: bytewax/bytewax-feast:latest
```

The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in.

The `pods` configuration directive is used to configure the number of Bytewax pods that will be started for each FeatureView that is being materialized.
The `image` parameter specifies which container image to use when running the materialization job. To create a custom image based on this container, please see the [GitHub repository](https://github.com/bytewax/bytewax-feast) for this image.

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Any, Iterable, List
from typing import List

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env
from tqdm import tqdm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime
from typing import Callable, List, Literal, Sequence, Union

import kubernetes
import kubernetes.client
import yaml
from kubernetes import client
from kubernetes import config as k8s_config
Expand Down Expand Up @@ -36,16 +34,13 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
type: Literal["bytewax"] = "bytewax"
""" Materialization type selector"""

pods: int = 3
""" (optional) The number of Kubernetes pods to create.
For each feature view to be materialized, Bytewax will create a job with the specified
number of pods and distribute the work among them.
"""

namespace: StrictStr = "default"
""" (optional) The namespace in Kubernetes to use when creating services, configuration maps and jobs.
"""

image: StrictStr = "bytewax/bytewax-feast:latest"
""" (optional) The container image to use when running the materialization job."""

env: List[dict] = []
""" (optional) A list of environment variables to set in the created Kubernetes pods.
These environment variables can be used to reference Kubernetes secrets.
Expand Down Expand Up @@ -169,7 +164,7 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
self._create_job_definition(
job_id,
self.namespace,
self.batch_engine_config.pods,
len(paths), # Create a pod for each parquet file
self.batch_engine_config.env,
)
except FailToCreateError as failures:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from typing import Optional

import kubernetes
import kubernetes.client
from kubernetes import client
from kubernetes import config as k8s_config
from kubernetes import utils

from feast.infra.materialization.batch_materialization_engine import (
MaterializationJob,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Running Bytewax integration tests

To run the Bytewax integration tests, you'll need to provision a cluster using [eksctl.](https://docs.aws.amazon.com/eks/latest/userguide/eksctl.html).

## Creating an EKS cluster

In this directory is a configuration file for a single-node EKS cluster

To create the EKS cluster needed for testing, issue the following command:

``` shell
> eksctl create cluster -f ./eks-config.yaml
```

When the tests are complete, delete the created cluster with:

``` shell
> eksctl delete cluster bytewax-feast-cluster
```



Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig

metadata:
name: bytewax-feast-cluster
version: "1.22"
region: us-west-2

managedNodeGroups:
- name: ng-1
instanceType: c6a.large
desiredCapacity: 1
privateNetworking: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from datetime import timedelta

import pytest

from feast import Entity, Feature, FeatureView, ValueType
from tests.data.data_creator import create_basic_driver_dataset
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
RegistryLocation,
)
from tests.integration.feature_repos.repo_configuration import (
construct_test_environment,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)
from tests.utils.e2e_test_validation import validate_offline_online_store_consistency


@pytest.mark.integration
def test_bytewax_materialization():
bytewax_config = IntegrationTestRepoConfig(
provider="aws",
online_store={"type": "dynamodb", "region": "us-west-2"},
offline_store_creator=RedshiftDataSourceCreator,
batch_engine={
"type": "bytewax",
},
registry_location=RegistryLocation.S3,
)
bytewax_environment = construct_test_environment(bytewax_config, None)

df = create_basic_driver_dataset()
ds = bytewax_environment.data_source_creator.create_data_source(
df,
bytewax_environment.feature_store.project,
field_mapping={"ts_1": "ts"},
)

fs = bytewax_environment.feature_store
driver = Entity(
name="driver_id",
join_key="driver_id",
value_type=ValueType.INT64,
)

driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(weeks=52),
features=[Feature(name="value", dtype=ValueType.FLOAT)],
batch_source=ds,
)

try:
fs.apply([driver, driver_stats_fv])

# materialization is run in two steps and
# we use timestamp from generated dataframe as a split point
split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1)

print(f"Split datetime: {split_dt}")

validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt)
finally:
fs.teardown()