Skip to content

Commit b94c9c9

Browse files
sudohainguyenjames-crabtree-sp
authored andcommitted
feat: Make bytewax job write as mini-batches (feast-dev#3777)
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com> Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
1 parent 5a4f24c commit b94c9c9

2 files changed

Lines changed: 25 additions & 5 deletions

File tree

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from typing import List
23

34
import pyarrow as pa
@@ -11,6 +12,8 @@
1112
from feast import FeatureStore, FeatureView, RepoConfig
1213
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1314

15+
DEFAULT_BATCH_SIZE = 1000
16+
1417

1518
class BytewaxMaterializationDataflow:
1619
def __init__(
@@ -42,6 +45,11 @@ def input_builder(self, worker_index, worker_count, _state):
4245
return [(None, self.paths[self.worker_index])]
4346

4447
def output_builder(self, worker_index, worker_count):
48+
def yield_batch(iterable, batch_size):
49+
"""Yield mini-batches from an iterable."""
50+
for i in range(0, len(iterable), batch_size):
51+
yield iterable[i : i + batch_size]
52+
4553
def output_fn(batch):
4654
table = pa.Table.from_batches([batch])
4755

@@ -60,12 +68,17 @@ def output_fn(batch):
6068
)
6169
provider = self.feature_store._get_provider()
6270
with tqdm(total=len(rows_to_write)) as progress:
63-
provider.online_write_batch(
64-
config=self.config,
65-
table=self.feature_view,
66-
data=rows_to_write,
67-
progress=progress.update,
71+
# break rows_to_write to mini-batches
72+
batch_size = int(
73+
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
6874
)
75+
for mini_batch in yield_batch(rows_to_write, batch_size):
76+
provider.online_write_batch(
77+
config=self.config,
78+
table=self.feature_view,
79+
data=mini_batch,
80+
progress=progress.update,
81+
)
6982

7083
return output_fn
7184

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
8888
print_pod_logs_on_failure: bool = True
8989
"""(optional) Print pod logs on job failure. Only applies to synchronous materialization"""
9090

91+
mini_batch_size: int = 1000
92+
""" (optional) Number of rows to process per write operation (default 1000)"""
93+
9194

9295
class BytewaxMaterializationEngine(BatchMaterializationEngine):
9396
def __init__(
@@ -362,6 +365,10 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
362365
"name": "BYTEWAX_STATEFULSET_NAME",
363366
"value": f"dataflow-{job_id}",
364367
},
368+
{
369+
"name": "BYTEWAX_MINI_BATCH_SIZE",
370+
"value": str(self.batch_engine_config.mini_batch_size),
371+
},
365372
]
366373
# Add any Feast configured environment variables
367374
job_env.extend(env)

0 commit comments

Comments
 (0)