Skip to content

Commit 4ebe00f

Browse files
authored
fix: Update bytewax materialization (#3368)
* Updates Bytewax materialization engine. - Since no data is exchanged between workers, remove k8s service definition. - Update bytewax dataflow to only use one worker. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> * Update Bytewax to latest version. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> * Format imports. Signed-off-by: Dan Herrera <whoahbot@bytewax.io> Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
1 parent eaf354c commit 4ebe00f

File tree

3 files changed

+16
-75
lines changed

3 files changed

+16
-75
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import pyarrow as pa
44
import pyarrow.parquet as pq
55
import s3fs
6-
from bytewax import Dataflow, cluster_main # type: ignore
7-
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
8-
from bytewax.parse import proc_env
6+
from bytewax.dataflow import Dataflow # type: ignore
7+
from bytewax.execution import cluster_main
8+
from bytewax.inputs import ManualInputConfig, distribute
9+
from bytewax.outputs import ManualOutputConfig
910
from tqdm import tqdm
1011

1112
from feast import FeatureStore, FeatureView, RepoConfig
@@ -37,20 +38,15 @@ def process_path(self, path):
3738

3839
return batches
3940

40-
def input_builder(self, worker_index, worker_count, resume_epoch):
41+
def input_builder(self, worker_index, worker_count, _state):
4142
worker_paths = distribute(self.paths, worker_index, worker_count)
42-
epoch = 0
4343
for path in worker_paths:
44-
yield AdvanceTo(epoch)
45-
yield Emit(path)
46-
epoch += 1
44+
yield None, path
4745

4846
return
4947

5048
def output_builder(self, worker_index, worker_count):
51-
def output_fn(epoch_batch):
52-
_, batch = epoch_batch
53-
49+
def output_fn(batch):
5450
table = pa.Table.from_batches([batch])
5551

5652
if self.feature_view.batch_source.field_mapping is not None:
@@ -79,11 +75,7 @@ def output_fn(epoch_batch):
7975

8076
def _run_dataflow(self):
8177
flow = Dataflow()
78+
flow.input("inp", ManualInputConfig(self.input_builder))
8279
flow.flat_map(self.process_path)
83-
flow.capture()
84-
cluster_main(
85-
flow,
86-
ManualInputConfig(self.input_builder),
87-
self.output_builder,
88-
**proc_env(),
89-
)
80+
flow.capture(ManualOutputConfig(self.output_builder))
81+
cluster_main(flow, [], 0)

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from feast.infra.registry.base_registry import BaseRegistry
2424
from feast.repo_config import FeastConfigBaseModel
2525
from feast.stream_feature_view import StreamFeatureView
26-
from feast.utils import _get_column_names
26+
from feast.utils import _get_column_names, get_default_yaml_file_path
2727

2828
from .bytewax_materialization_job import BytewaxMaterializationJob
2929

@@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
157157
# Create a k8s configmap with information needed by bytewax
158158
self._create_configuration_map(job_id, paths, feature_view, self.namespace)
159159

160-
# Create the k8s service definition, used for bytewax communication
161-
self._create_service_definition(job_id, self.namespace)
162-
163160
# Create the k8s job definition
164161
self._create_job_definition(
165162
job_id,
@@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
175172
def _create_configuration_map(self, job_id, paths, feature_view, namespace):
176173
"""Create a Kubernetes configmap for this job"""
177174

178-
feature_store_configuration = yaml.dump(
179-
yaml.safe_load(
180-
self.repo_config.json(
181-
exclude={"repo_path"},
182-
exclude_unset=True,
183-
)
184-
)
185-
)
175+
repo_path = self.repo_config.repo_path
176+
assert repo_path
177+
feature_store_path = get_default_yaml_file_path(repo_path)
178+
feature_store_configuration = feature_store_path.read_text()
186179

187180
materialization_config = yaml.dump(
188181
{"paths": paths, "feature_view": feature_view.name}
@@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
204197
body=configmap_manifest,
205198
)
206199

207-
def _create_service_definition(self, job_id, namespace):
208-
"""Creates a kubernetes service definition.
209-
210-
This service definition is created to allow bytewax workers
211-
to communicate with each other.
212-
"""
213-
service_definition = {
214-
"apiVersion": "v1",
215-
"kind": "Service",
216-
"metadata": {
217-
"name": f"dataflow-{job_id}",
218-
"namespace": namespace,
219-
},
220-
"spec": {
221-
"clusterIP": "None",
222-
"clusterIPs": ["None"],
223-
"internalTrafficPolicy": "Cluster",
224-
"ipFamilies": ["IPv4"],
225-
"ipFamilyPolicy": "SingleStack",
226-
"ports": [
227-
{
228-
"name": "worker",
229-
"port": 9999,
230-
"protocol": "TCP",
231-
"targetPort": 9999,
232-
}
233-
],
234-
"selector": {"job-name": f"dataflow-{job_id}"},
235-
"sessionAffinity": "None",
236-
"type": "ClusterIP",
237-
},
238-
}
239-
240-
utils.create_from_dict(self.k8s_client, service_definition)
241-
242200
def _create_job_definition(self, job_id, namespace, pods, env):
243201
"""Create a kubernetes job definition."""
244202
job_env = [
@@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
269227
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
270228
"value": "false",
271229
},
272-
{
273-
"name": "BYTEWAX_HOSTFILE_PATH",
274-
"value": "/etc/bytewax/hostfile.txt",
275-
},
276230
{
277231
"name": "BYTEWAX_STATEFULSET_NAME",
278232
"value": f"dataflow-{job_id}",
@@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
299253
"subdomain": f"dataflow-{job_id}",
300254
"initContainers": [
301255
{
302-
"command": [
303-
"sh",
304-
"-c",
305-
f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone',
306-
],
307256
"env": [
308257
{
309258
"name": "BYTEWAX_REPLICAS",

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393

9494
AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"]
9595

96-
BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"]
96+
BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"]
9797

9898
SNOWFLAKE_REQUIRED = [
9999
"snowflake-connector-python[pandas]>=2.7.3,<3",

0 commit comments

Comments
 (0)