|
3 | 3 | import pyarrow as pa |
4 | 4 | import pyarrow.parquet as pq |
5 | 5 | import s3fs |
6 | | -from bytewax import Dataflow, cluster_main # type: ignore |
7 | | -from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute |
8 | | -from bytewax.parse import proc_env |
| 6 | + |
| 7 | +from bytewax.dataflow import Dataflow # type: ignore |
| 8 | +from bytewax.execution import cluster_main |
| 9 | +from bytewax.inputs import ManualInputConfig, distribute |
| 10 | +from bytewax.outputs import ManualOutputConfig |
| 11 | + |
9 | 12 | from tqdm import tqdm |
10 | 13 |
|
11 | 14 | from feast import FeatureStore, FeatureView, RepoConfig |
@@ -37,20 +40,15 @@ def process_path(self, path): |
37 | 40 |
|
38 | 41 | return batches |
39 | 42 |
|
40 | | - def input_builder(self, worker_index, worker_count, resume_epoch): |
| 43 | + def input_builder(self, worker_index, worker_count, _state): |
41 | 44 | worker_paths = distribute(self.paths, worker_index, worker_count) |
42 | | - epoch = 0 |
43 | 45 | for path in worker_paths: |
44 | | - yield AdvanceTo(epoch) |
45 | | - yield Emit(path) |
46 | | - epoch += 1 |
| 46 | + yield None, path |
47 | 47 |
|
48 | 48 | return |
49 | 49 |
|
50 | 50 | def output_builder(self, worker_index, worker_count): |
51 | | - def output_fn(epoch_batch): |
52 | | - _, batch = epoch_batch |
53 | | - |
| 51 | + def output_fn(batch): |
54 | 52 | table = pa.Table.from_batches([batch]) |
55 | 53 |
|
56 | 54 | if self.feature_view.batch_source.field_mapping is not None: |
@@ -79,8 +77,7 @@ def output_fn(epoch_batch): |
79 | 77 |
|
80 | 78 | def _run_dataflow(self): |
81 | 79 | flow = Dataflow() |
| 80 | + flow.input("inp", ManualInputConfig(self.input_builder)) |
82 | 81 | flow.flat_map(self.process_path) |
83 | | - flow.capture() |
84 | | - cluster_main( |
85 | | - flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0 |
86 | | - ) |
| 82 | + flow.capture(ManualOutputConfig(self.output_builder)) |
| 83 | + cluster_main(flow, [], 0) |
0 commit comments