Skip to content

Commit 2313c1d

Browse files
SAASMLOPS-809 fix bytewax workers so they only process a single file (#6)
* SAASMLOPS-809 fix bytewax workers so they only process a single file * SAASMLOPS-809 fix newlines Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
1 parent 71c02f0 commit 2313c1d

3 files changed

Lines changed: 17 additions & 11 deletions

File tree

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pyarrow.parquet as pq
66
from bytewax.dataflow import Dataflow # type: ignore
77
from bytewax.execution import cluster_main
8-
from bytewax.inputs import ManualInputConfig, distribute
8+
from bytewax.inputs import ManualInputConfig
99
from bytewax.outputs import ManualOutputConfig
1010
from tqdm import tqdm
1111

@@ -21,17 +21,25 @@ def __init__(
2121
config: RepoConfig,
2222
feature_view: FeatureView,
2323
paths: List[str],
24+
worker_index: int
2425
):
2526
self.config = config
2627
self.feature_store = FeatureStore(config=config)
2728

2829
self.feature_view = feature_view
30+
self.worker_index = worker_index
2931
self.paths = paths
3032

3133
self._run_dataflow()
3234

3335
def process_path(self, path):
36+
<<<<<<< HEAD
3437
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
38+
=======
39+
fs = s3fs.S3FileSystem()
40+
logger.info(f"Processing path {path}")
41+
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
42+
>>>>>>> 15c523a2 (SAASMLOPS-809 fix bytewax workers so they only process a single file (#6))
3543
batches = []
3644
for fragment in dataset.fragments:
3745
for batch in fragment.to_table().to_batches():
@@ -40,11 +48,7 @@ def process_path(self, path):
4048
return batches
4149

4250
def input_builder(self, worker_index, worker_count, _state):
43-
worker_paths = distribute(self.paths, worker_index, worker_count)
44-
for path in worker_paths:
45-
yield None, path
46-
47-
return
51+
return [(None, self.paths[self.worker_index])]
4852

4953
def output_builder(self, worker_index, worker_count):
5054
def yield_batch(iterable, batch_size):

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def _materialize_one(
205205
try:
206206
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
207207
except ApiException as de:
208-
logger.warning(f"Could not delete job due to API Error: {ae.body}")
208+
logger.warning(f"Could not delete job due to API Error: {de.body}")
209209
raise e
210210
self._print_pod_logs(job.job_id(), feature_view)
211211
return job
@@ -293,7 +293,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
293293
},
294294
{
295295
"name": "BYTEWAX_REPLICAS",
296-
"value": "1",
296+
"value": f"{pods}",
297297
},
298298
{
299299
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
@@ -331,8 +331,8 @@ def _create_job_definition(self, job_id, namespace, pods, env):
331331
"spec": {
332332
"ttlSecondsAfterFinished": 3600,
333333
"backoffLimit": self.batch_engine_config.retry_limit,
334-
"completions": 1,
335-
"parallelism": 1,
334+
"completions": pods,
335+
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
336336
"completionMode": "Indexed",
337337
"template": {
338338
"metadata": {
@@ -349,7 +349,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
349349
"env": [
350350
{
351351
"name": "BYTEWAX_REPLICAS",
352-
"value": "1",
352+
"value": f"{pods}",
353353
}
354354
],
355355
"image": "busybox",

sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import yaml
23

34
from feast import FeatureStore, RepoConfig
@@ -19,4 +20,5 @@
1920
config,
2021
store.get_feature_view(bytewax_config["feature_view"]),
2122
bytewax_config["paths"],
23+
int(os.environ["JOB_COMPLETION_INDEX"])
2224
)

0 commit comments

Comments
 (0)