Skip to content

Commit 55c61f9

Browse files
authored
feat: Initial Bytewax materialization engine (feast-dev#2974)
* feat: Initial Bytewax materialization engine Signed-off-by: Dan Herrera <whoahbot@bytewax.io> * Respond to PR feedback - Add integration test, by factoring out shared consistency test. - Make the number of Pods dynamic, based on the number of .parquet file paths. - Add instructions for creating a bytewax test cluster for integration testing. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> * Mark bytewax test to be skipped. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> * Remove unused offline store reference. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
1 parent 41851be commit 55c61f9

16 files changed

Lines changed: 784 additions & 1 deletion

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Batch materialization
2+
3+
Please see [Batch Materialization Engine](../../getting-started/architecture-and-components/batch-materialization-engine.md) for an explanation of batch materialization engines.
4+
5+
{% page-ref page="bytewax.md" %}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Bytewax
2+
3+
## Description
4+
5+
The [Bytewax](https://bytewax.io) batch materialization engine provides an execution
6+
engine for batch materializing operations (`materialize` and `materialize-incremental`).
7+
8+
### Guide
9+
10+
In order to use the Bytewax materialization engine, you will need a [Kubernetes](https://kubernetes.io/) cluster running version 1.22.10 or greater.
11+
12+
#### Kubernetes Authentication
13+
14+
The Bytewax materialization engine loads authentication and cluster information from the [kubeconfig file](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/). By default, kubectl looks for a file named `config` in the `$HOME/.kube directory`. You can specify other kubeconfig files by setting the `KUBECONFIG` environment variable.
15+
16+
#### Resource Authentication
17+
18+
Bytewax jobs can be configured to access [Kubernetes secrets](https://kubernetes.io/docs/concepts/configuration/secret/) as environment variables to access online and offline stores during job runs.
19+
20+
To configure secrets, first create them using `kubectl`:
21+
22+
``` shell
23+
kubectl create secret generic -n bytewax aws-credentials --from-literal=aws-access-key-id='<access key id>' --from-literal=aws-secret-access-key='<secret access key>'
24+
```
25+
26+
Then configure them in the batch_engine section of `feature_store.yaml`:
27+
28+
``` yaml
29+
batch_engine:
30+
type: bytewax
31+
namespace: bytewax
32+
env:
33+
- name: AWS_ACCESS_KEY_ID
34+
valueFrom:
35+
secretKeyRef:
36+
name: aws-credentials
37+
key: aws-access-key-id
38+
- name: AWS_SECRET_ACCESS_KEY
39+
valueFrom:
40+
secretKeyRef:
41+
name: aws-credentials
42+
key: aws-secret-access-key
43+
```
44+
45+
#### Configuration
46+
47+
The Bytewax materialization engine is configured through the The `feature_store.yaml` configuration file:
48+
49+
``` yaml
50+
batch_engine:
51+
type: bytewax
52+
namespace: bytewax
53+
image: bytewax/bytewax-feast:latest
54+
```
55+
56+
The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in.
57+
58+
The `image` parameter specifies which container image to use when running the materialization job. To create a custom image based on this container, please see the [GitHub repository](https://github.com/bytewax/bytewax-feast) for this image.
59+

sdk/python/docs/index.rst

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ Local Engine
310310
(Alpha) Lambda Based Engine
311311
---------------------------
312312

313-
.. autoclass:: feast.infra.materialization.lambda.lambda_engine
313+
.. automodule:: feast.infra.materialization.lambda.lambda_engine
314+
:members:
315+
:noindex:
316+
317+
318+
Bytewax Engine
319+
---------------------------
320+
321+
.. automodule:: feast.infra.materialization.contrib.bytewax
314322
:members:
315323
:noindex:
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
feast.infra.materialization.contrib.bytewax package
2+
=================================================================
3+
4+
Submodules
5+
----------
6+
7+
feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_engine
8+
----------------------------------------------------------------------
9+
10+
.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine
11+
:members:
12+
:undoc-members:
13+
:show-inheritance:
14+
15+
feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_job
16+
----------------------------------------------------------------------
17+
18+
.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_job
19+
:members:
20+
:undoc-members:
21+
:show-inheritance:
22+
23+
Module contents
24+
---------------
25+
26+
.. automodule:: feast.infra.materialization.contrib.bytewax
27+
:members:
28+
:undoc-members:
29+
:show-inheritance:
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
feast.infra.materialization.contrib package
2+
==========================================
3+
4+
Subpackages
5+
-----------
6+
7+
.. toctree::
8+
:maxdepth: 4
9+
10+
feast.infra.materialization.contrib.bytewax

sdk/python/docs/source/index.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,11 @@ Local Engine
313313
.. autoclass:: feast.infra.materialization.lambda.lambda_engine
314314
:members:
315315
:noindex:
316+
317+
318+
Bytewax Engine
319+
---------------------------
320+
321+
.. automodule:: feast.infra.materialization.contrib.bytewax
322+
:members:
323+
:noindex:
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from .bytewax_materialization_dataflow import BytewaxMaterializationDataflow
2+
from .bytewax_materialization_engine import (
3+
BytewaxMaterializationEngine,
4+
BytewaxMaterializationEngineConfig,
5+
)
6+
from .bytewax_materialization_job import BytewaxMaterializationJob
7+
from .bytewax_materialization_task import BytewaxMaterializationTask
8+
9+
__all__ = [
10+
"BytewaxMaterializationTask",
11+
"BytewaxMaterializationJob",
12+
"BytewaxMaterializationDataflow",
13+
"BytewaxMaterializationEngine",
14+
"BytewaxMaterializationEngineConfig",
15+
]
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from typing import List
2+
3+
import pyarrow as pa
4+
import pyarrow.parquet as pq
5+
import s3fs
6+
from bytewax import Dataflow, cluster_main # type: ignore
7+
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
8+
from bytewax.parse import proc_env
9+
from tqdm import tqdm
10+
11+
from feast import FeatureStore, FeatureView, RepoConfig
12+
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
13+
14+
15+
class BytewaxMaterializationDataflow:
16+
def __init__(
17+
self,
18+
config: RepoConfig,
19+
feature_view: FeatureView,
20+
paths: List[str],
21+
):
22+
self.config = config
23+
self.feature_store = FeatureStore(config=config)
24+
25+
self.feature_view = feature_view
26+
self.paths = paths
27+
28+
self._run_dataflow()
29+
30+
def process_path(self, path):
31+
fs = s3fs.S3FileSystem()
32+
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
33+
batches = []
34+
for fragment in dataset.fragments:
35+
for batch in fragment.to_table().to_batches():
36+
batches.append(batch)
37+
38+
return batches
39+
40+
def input_builder(self, worker_index, worker_count, resume_epoch):
41+
worker_paths = distribute(self.paths, worker_index, worker_count)
42+
epoch = 0
43+
for path in worker_paths:
44+
yield AdvanceTo(epoch)
45+
yield Emit(path)
46+
epoch += 1
47+
48+
return
49+
50+
def output_builder(self, worker_index, worker_count):
51+
def output_fn(epoch_batch):
52+
_, batch = epoch_batch
53+
54+
table = pa.Table.from_batches([batch])
55+
56+
if self.feature_view.batch_source.field_mapping is not None:
57+
table = _run_pyarrow_field_mapping(
58+
table, self.feature_view.batch_source.field_mapping
59+
)
60+
61+
join_key_to_value_type = {
62+
entity.name: entity.dtype.to_value_type()
63+
for entity in self.feature_view.entity_columns
64+
}
65+
66+
rows_to_write = _convert_arrow_to_proto(
67+
table, self.feature_view, join_key_to_value_type
68+
)
69+
provider = self.feature_store._get_provider()
70+
with tqdm(total=len(rows_to_write)) as progress:
71+
provider.online_write_batch(
72+
config=self.config,
73+
table=self.feature_view,
74+
data=rows_to_write,
75+
progress=progress.update,
76+
)
77+
78+
return output_fn
79+
80+
def _run_dataflow(self):
81+
flow = Dataflow()
82+
flow.flat_map(self.process_path)
83+
flow.capture()
84+
cluster_main(
85+
flow,
86+
ManualInputConfig(self.input_builder),
87+
self.output_builder,
88+
**proc_env(),
89+
)

0 commit comments

Comments
 (0)