Skip to content

Commit a8d4ac4

Browse files
committed
Update Bytewax to latest version.
Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
1 parent 5290b0f commit a8d4ac4

File tree

2 files changed

+13
-16
lines changed

2 files changed

+13
-16
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import pyarrow as pa
44
import pyarrow.parquet as pq
55
import s3fs
6-
from bytewax import Dataflow, cluster_main # type: ignore
7-
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
8-
from bytewax.parse import proc_env
6+
7+
from bytewax.dataflow import Dataflow # type: ignore
8+
from bytewax.execution import cluster_main
9+
from bytewax.inputs import ManualInputConfig, distribute
10+
from bytewax.outputs import ManualOutputConfig
11+
912
from tqdm import tqdm
1013

1114
from feast import FeatureStore, FeatureView, RepoConfig
@@ -37,20 +40,15 @@ def process_path(self, path):
3740

3841
return batches
3942

40-
def input_builder(self, worker_index, worker_count, resume_epoch):
43+
def input_builder(self, worker_index, worker_count, _state):
4144
worker_paths = distribute(self.paths, worker_index, worker_count)
42-
epoch = 0
4345
for path in worker_paths:
44-
yield AdvanceTo(epoch)
45-
yield Emit(path)
46-
epoch += 1
46+
yield None, path
4747

4848
return
4949

5050
def output_builder(self, worker_index, worker_count):
51-
def output_fn(epoch_batch):
52-
_, batch = epoch_batch
53-
51+
def output_fn(batch):
5452
table = pa.Table.from_batches([batch])
5553

5654
if self.feature_view.batch_source.field_mapping is not None:
@@ -79,8 +77,7 @@ def output_fn(epoch_batch):
7977

8078
def _run_dataflow(self):
8179
flow = Dataflow()
80+
flow.input("inp", ManualInputConfig(self.input_builder))
8281
flow.flat_map(self.process_path)
83-
flow.capture()
84-
cluster_main(
85-
flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0
86-
)
82+
flow.capture(ManualOutputConfig(self.output_builder))
83+
cluster_main(flow, [], 0)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393

9494
AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"]
9595

96-
BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"]
96+
BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"]
9797

9898
SNOWFLAKE_REQUIRED = [
9999
"snowflake-connector-python[pandas]>=2.7.3,<3",

0 commit comments

Comments
 (0)