Skip to content

Commit c50cb95

Browse files
committed
[benchmarking] add u64N program
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 5bcdf94 commit c50cb95

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

scripts/scalebench.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
1818
```
1919
$ ./scripts/scalebench.py --mem-bw --program u64 --workers 1,4,8,16,20 \
20-
--payload 8 --duration 120 --repetition 1 \
20+
--payload-bytes 8 --duration 120 --repetition 1 \
2121
--version-suffix dbsp-sorting
2222
```
2323
"""
@@ -41,7 +41,7 @@
4141
from feldera import FelderaClient, PipelineBuilder
4242
from feldera.runtime_config import RuntimeConfig
4343

44-
PROGRAMS = ["u64", "binary", "string", "binary_primary_key"]
44+
PROGRAMS = ["u64", "u64N", "binary", "string", "binary_primary_key"]
4545
WORKER_COUNTS = [1, 2, 4, 8, 12, 16, 20]
4646
DEFAULT_PAYLOAD_BYTES = [8, 128, 512, 4096, 32768]
4747
BENCH_DURATION_S = 120
@@ -91,13 +91,36 @@ def normalize_payload_bytes(
9191
"Program u64 does not support payload sizes other than 8 bytes."
9292
)
9393
return [8]
94+
if program == "u64N":
95+
invalid = [size for size in payload_bytes if size % 8 != 0]
96+
if invalid:
97+
invalid_csv = ",".join(str(size) for size in invalid)
98+
raise SystemExit(
99+
"Program u64N requires payload sizes that are multiples of 8 bytes. "
100+
f"Invalid values: {invalid_csv}"
101+
)
94102
return payload_bytes
95103

96104

97105
def make_sql(program: str, datagen_workers: int, payload_bytes: int) -> str:
98106
if program == "u64":
99107
table_fields = " payload BIGINT NOT NULL\n"
100108
fields = None
109+
elif program == "u64N":
110+
if payload_bytes % 8 != 0:
111+
raise ValueError(
112+
f"Program u64N requires payload-bytes to be a multiple of 8, got {payload_bytes}"
113+
)
114+
u64_count = payload_bytes // 8
115+
if u64_count <= 0:
116+
raise ValueError(
117+
f"Program u64N requires payload-bytes >= 8, got {payload_bytes}"
118+
)
119+
table_fields = ",\n".join(
120+
f" payload{i} BIGINT NOT NULL" for i in range(u64_count)
121+
)
122+
table_fields += "\n"
123+
fields = {f"payload{i}": {"strategy": "uniform"} for i in range(u64_count)}
101124
elif program == "binary":
102125
table_fields = " payload BINARY NOT NULL\n"
103126
fields = {
@@ -400,11 +423,7 @@ def extract_metrics(payload: Dict[str, Any], pipeline_name: str) -> Dict[str, An
400423
storage = metrics.get("storage") or {}
401424
uptime = metrics.get("uptime") or {}
402425
state_amp = metrics.get("state-amplification") or {}
403-
buffered = (
404-
metrics.get("buffered_input_records")
405-
or metrics.get("buffered-input-records")
406-
or {}
407-
)
426+
buffered = metrics.get("buffered-input-records") or {}
408427

409428
return {
410429
"throughput_value": throughput.get("value"),
@@ -495,7 +514,7 @@ def main() -> int:
495514
"--payload-bytes",
496515
dest="payload_bytes",
497516
type=str,
498-
help="Comma-separated payload sizes in bytes, e.g. 8,8192.",
517+
help=("Comma-separated payload sizes in bytes, e.g. 8,8192."),
499518
)
500519
parser.add_argument(
501520
"--program",

0 commit comments

Comments
 (0)