Skip to content

Commit 5f40a7a

Browse files
authored
[Benchmarks] make batch size configurable (#2490)
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent d224abc commit 5f40a7a

3 files changed

Lines changed: 27 additions & 8 deletions

File tree

benchmark/feldera-sql/benchmarks/nexmark/table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ CREATE TABLE person (
2020
"options": {{
2121
"events": {events},
2222
"threads": {cores},
23-
"batch_size_per_thread": 1000,
24-
"max_step_size": 1000000
23+
"batch_size_per_thread": {batchsize},
24+
"max_step_size": {batchsize}
2525
}}
2626
}}
2727
}}

benchmark/feldera-sql/run.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def load_queries(dir):
3030
queries[f.split('.')[0]] = file.read()
3131
return queries
3232

33-
def load_table(folder: str, with_lateness: bool, suffix, events, cores):
33+
def load_table(folder: str, with_lateness: bool, suffix, events, cores, batchsize):
3434
p = os.path.join(FILE_DIR, folder, 'table.sql')
3535
file = open(p, 'r')
3636
text = file.read()
@@ -48,6 +48,7 @@ def load_table(folder: str, with_lateness: bool, suffix, events, cores):
4848
subst["events"] = events
4949
subst["cores"] = cores
5050
subst["folder"] = os.path.join(FILE_DIR, folder)
51+
subst["batchsize"] = batchsize
5152
return text.format(**subst)
5253

5354
def sort_queries(queries):
@@ -144,6 +145,7 @@ def main():
144145
group.add_argument("--api-url", required=True, help="Feldera API URL (e.g., http://localhost:8080 )")
145146
group.add_argument("--api-key", required=False, help="Feldera API key (e.g., \"apikey:0123456789ABCDEF\")")
146147
group.add_argument("--cores", type=int, help="Number of cores to use for workers (default: 16)")
148+
group.add_argument("--batchsize", type=int, help="Batch size to use for input (default: 10000)")
147149
group.add_argument('--storage', action=argparse.BooleanOptionalAction, help='whether to enable storage (default: --no-storage)')
148150
group.add_argument('--min-storage-bytes', type=int, help='If storage is enabled, the minimum number of bytes to write a batch to storage.')
149151
group.add_argument('--folder', help='Folder with table and queries, organized as folder/table.sql, folder/queries/qN.sql for numbers N (default: benchmarks/nexmark)')
@@ -163,7 +165,7 @@ def main():
163165
help="Kafka options passed as -O option=value, e.g., -O bootstrap.servers=localhost:9092; ignored for Nexmark, required for other benchmarks")
164166
group.add_argument("--poller-threads", required=False, type=int, help="Override number of poller threads to use")
165167
group.add_argument('--input-topic-suffix', help='suffix to apply to input topic names (by default, "")')
166-
parser.set_defaults(lateness=True, storage=False, cores=16, metrics_interval=1, folder='benchmarks/nexmark', events=100000)
168+
parser.set_defaults(lateness=True, storage=False, batchsize=10000, cores=16, metrics_interval=1, folder='benchmarks/nexmark', events=100000)
167169

168170

169171
global api_url, kafka_options, headers
@@ -177,9 +179,10 @@ def main():
177179
suffix = parser.parse_args().input_topic_suffix or ''
178180
events = parser.parse_args().events
179181
cores = int(parser.parse_args().cores)
182+
batchsize = int(parser.parse_args().batchsize)
180183

181184
folder = parser.parse_args().folder
182-
table = load_table(folder, parser.parse_args().lateness, suffix, events, cores)
185+
table = load_table(folder, parser.parse_args().lateness, suffix, events, cores, batchsize)
183186
all_queries = load_queries(os.path.join(FILE_DIR, folder + '/queries/'))
184187
include_disabled = parser.parse_args().include_disabled or False
185188
disabled_folder = os.path.join(FILE_DIR, folder + '/disabled-queries/')
@@ -267,16 +270,20 @@ def main():
267270
while True:
268271
req = requests.get(f"{api_url}/v0/pipelines/{full_name}/stats", headers=headers)
269272
if req.status_code != 200:
270-
print("Failed to get stats")
271-
error = True
273+
print("Failed to get stats: ", req)
274+
if req.status_code == 400:
272275
break
276+
273277
stats = req.json()
278+
#for input in stats["inputs"]:
279+
# print(input["endpoint_name"], input["metrics"]["end_of_input"])
274280
elapsed = time.time() - start
275281
if "global_metrics" in stats:
276282
global_metrics = stats["global_metrics"]
277283
processed = global_metrics["total_processed_records"]
278284
peak_memory = max(peak_memory, global_metrics["rss_bytes"])
279285
cpu_msecs = global_metrics.get("cpu_msecs", 0)
286+
last_metrics = elapsed
280287
metrics_dict = {"name":pipeline_name, "elapsed_seconds":elapsed}
281288
for key, value in global_metrics.items():
282289
metrics_seen.add(key)
@@ -298,6 +305,7 @@ def main():
298305
metrics_dict[k] = value["Histogram"][v]
299306
pipeline_metrics += [metrics_dict]
300307
late_drops = metrics_dict.get("records_late", 0)
308+
steps = metrics_dict.get("feldera_dbsp_step", 0)
301309
if processed > last_processed:
302310
before, after = ('\r', '') if os.isatty(1) else ('', '\n')
303311
peak_gib = peak_memory / 1024 / 1024 / 1024

benchmark/run-nexmark.sh

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ project=
2323
bucket=
2424
region=us-west1
2525
core_quota=24
26+
batchsize=10000
2627

2728
nextarg=
2829
for arg
@@ -69,6 +70,12 @@ do
6970
--cores=*)
7071
cores=${arg#--cores=}
7172
;;
73+
--batchsize=*)
74+
batch=${arg#--batchsize=}
75+
;;
76+
--batchsize)
77+
nextarg=batchsize
78+
;;
7279
--output|-o)
7380
nextarg=output
7481
;;
@@ -128,6 +135,7 @@ The following options are supported:
128135
-e, --events=EVENTS Run EVENTS events (default: 100k)
129136
-L, --language=LANG Use given query LANG: default sql
130137
-c, --cores=CORES Use CORES cores for computation (default: min(16,nproc))
138+
--batchsize=BS Batch size to use for input (default 10000)
131139
-q, --query=QUERY Queries to run (default: all)
132140
-o, --output=OUTPUT Append CSV-formatted output to OUTPUT (default: nexmark.csv).
133141
@@ -329,6 +337,7 @@ feldera2csv() {
329337
esac
330338
parse_time() {
331339
case $1 in
340+
*ns) echo "${1%ns}/1000000000" | bc -l ;;
332341
*ms) echo "${1%ms}/1000" | bc -l ;;
333342
*s) echo "${1%s}" ;;
334343
esac
@@ -443,6 +452,7 @@ Running Nexmark suite with configuration:
443452
events: $events
444453
query: $query
445454
cores: $cores
455+
batchsize: $batchsize
446456
EOF
447457
case $runner:$language in
448458
feldera:default)
@@ -472,7 +482,8 @@ case $runner:$language in
472482
--cores $cores \
473483
--events $events \
474484
--csv results.csv \
475-
--circuit-profile \
485+
--batchsize $batchsize \
486+
--circuit-profile \
476487
$(if $storage; then printf "%s" --storage; fi) \
477488
--query $(if test $query = all; then echo all; else echo q$query; fi)
478489
;;

0 commit comments

Comments
 (0)