Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update Bytewax to latest version.
Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
  • Loading branch information
whoahbot committed Nov 30, 2022
commit a8d4ac471cbcd2397849d5621ea06be0d6f20689
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env

from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import ManualOutputConfig

from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
Expand Down Expand Up @@ -37,20 +40,15 @@ def process_path(self, path):

return batches

def input_builder(self, worker_index, worker_count, resume_epoch):
def input_builder(self, worker_index, worker_count, _state):
worker_paths = distribute(self.paths, worker_index, worker_count)
epoch = 0
for path in worker_paths:
yield AdvanceTo(epoch)
yield Emit(path)
epoch += 1
yield None, path

return

def output_builder(self, worker_index, worker_count):
def output_fn(epoch_batch):
_, batch = epoch_batch

def output_fn(batch):
table = pa.Table.from_batches([batch])

if self.feature_view.batch_source.field_mapping is not None:
Expand Down Expand Up @@ -79,8 +77,7 @@ def output_fn(epoch_batch):

def _run_dataflow(self):
flow = Dataflow()
flow.input("inp", ManualInputConfig(self.input_builder))
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0
)
flow.capture(ManualOutputConfig(self.output_builder))
cluster_main(flow, [], 0)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@

AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"]

BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"]
BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"]

SNOWFLAKE_REQUIRED = [
"snowflake-connector-python[pandas]>=2.7.3,<3",
Expand Down