Skip to content

Commit a9f7463

Browse files
committed
feat: Make bytewax job write as mini-batches
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
1 parent 2192e65 commit a9f7463

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from typing import List
23

34
import pyarrow as pa
@@ -12,6 +13,8 @@
1213
from feast import FeatureStore, FeatureView, RepoConfig
1314
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1415

16+
DEFAULT_BATCH_SIZE = 1000
17+
1518

1619
class BytewaxMaterializationDataflow:
1720
def __init__(
@@ -46,6 +49,11 @@ def input_builder(self, worker_index, worker_count, _state):
4649
return
4750

4851
def output_builder(self, worker_index, worker_count):
52+
def yield_batch(iterable, batch_size):
53+
"""Yield mini-batches from an iterable."""
54+
for i in range(0, len(iterable), batch_size):
55+
yield iterable[i : i + batch_size]
56+
4957
def output_fn(batch):
5058
table = pa.Table.from_batches([batch])
5159

@@ -64,12 +72,17 @@ def output_fn(batch):
6472
)
6573
provider = self.feature_store._get_provider()
6674
with tqdm(total=len(rows_to_write)) as progress:
67-
provider.online_write_batch(
68-
config=self.config,
69-
table=self.feature_view,
70-
data=rows_to_write,
71-
progress=progress.update,
75+
# break rows_to_write to mini-batches
76+
batch_size = int(
77+
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
7278
)
79+
for mini_batch in yield_batch(rows_to_write, batch_size):
80+
provider.online_write_batch(
81+
config=self.config,
82+
table=self.feature_view,
83+
data=mini_batch,
84+
progress=progress.update,
85+
)
7386

7487
return output_fn
7588

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
6767
max_parallelism: int = 10
6868
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
6969

70+
mini_batch_size: int = 1000
71+
""" (optional) Number of rows to process per write operation (default 1000)"""
72+
7073

7174
class BytewaxMaterializationEngine(BatchMaterializationEngine):
7275
def __init__(
@@ -254,6 +257,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
254257
"name": "BYTEWAX_STATEFULSET_NAME",
255258
"value": f"dataflow-{job_id}",
256259
},
260+
{
261+
"name": "BYTEWAX_MINI_BATCH_SIZE",
262+
"value": str(self.batch_engine_config.mini_batch_size),
263+
},
257264
]
258265
# Add any Feast configured environment variables
259266
job_env.extend(env)

0 commit comments

Comments
 (0)