|
17 | 17 |
|
18 | 18 | ``` |
19 | 19 | $ ./scripts/scalebench.py --mem-bw --program u64 --workers 1,4,8,16,20 \ |
20 | | - --payload 8 --duration 120 --repetition 1 \ |
| 20 | + --payload-bytes 8 --duration 120 --repetition 1 \ |
21 | 21 | --version-suffix dbsp-sorting |
22 | 22 | ``` |
23 | 23 | """ |
|
41 | 41 | from feldera import FelderaClient, PipelineBuilder |
42 | 42 | from feldera.runtime_config import RuntimeConfig |
43 | 43 |
|
44 | | -PROGRAMS = ["u64", "binary", "string", "binary_primary_key"] |
| 44 | +PROGRAMS = ["u64", "u64N", "binary", "string", "binary_primary_key"] |
45 | 45 | WORKER_COUNTS = [1, 2, 4, 8, 12, 16, 20] |
46 | 46 | DEFAULT_PAYLOAD_BYTES = [8, 128, 512, 4096, 32768] |
47 | 47 | BENCH_DURATION_S = 120 |
@@ -91,13 +91,36 @@ def normalize_payload_bytes( |
91 | 91 | "Program u64 does not support payload sizes other than 8 bytes." |
92 | 92 | ) |
93 | 93 | return [8] |
| 94 | + if program == "u64N": |
| 95 | + invalid = [size for size in payload_bytes if size % 8 != 0] |
| 96 | + if invalid: |
| 97 | + invalid_csv = ",".join(str(size) for size in invalid) |
| 98 | + raise SystemExit( |
| 99 | + "Program u64N requires payload sizes that are multiples of 8 bytes. " |
| 100 | + f"Invalid values: {invalid_csv}" |
| 101 | + ) |
94 | 102 | return payload_bytes |
95 | 103 |
|
96 | 104 |
|
97 | 105 | def make_sql(program: str, datagen_workers: int, payload_bytes: int) -> str: |
98 | 106 | if program == "u64": |
99 | 107 | table_fields = " payload BIGINT NOT NULL\n" |
100 | 108 | fields = None |
| 109 | + elif program == "u64N": |
| 110 | + if payload_bytes % 8 != 0: |
| 111 | + raise ValueError( |
| 112 | + f"Program u64N requires payload-bytes to be a multiple of 8, got {payload_bytes}" |
| 113 | + ) |
| 114 | + u64_count = payload_bytes // 8 |
| 115 | + if u64_count <= 0: |
| 116 | + raise ValueError( |
| 117 | + f"Program u64N requires payload-bytes >= 8, got {payload_bytes}" |
| 118 | + ) |
| 119 | + table_fields = ",\n".join( |
| 120 | + f" payload{i} BIGINT NOT NULL" for i in range(u64_count) |
| 121 | + ) |
| 122 | + table_fields += "\n" |
| 123 | + fields = {f"payload{i}": {"strategy": "uniform"} for i in range(u64_count)} |
101 | 124 | elif program == "binary": |
102 | 125 | table_fields = " payload BINARY NOT NULL\n" |
103 | 126 | fields = { |
@@ -400,11 +423,7 @@ def extract_metrics(payload: Dict[str, Any], pipeline_name: str) -> Dict[str, An |
400 | 423 | storage = metrics.get("storage") or {} |
401 | 424 | uptime = metrics.get("uptime") or {} |
402 | 425 | state_amp = metrics.get("state-amplification") or {} |
403 | | - buffered = ( |
404 | | - metrics.get("buffered_input_records") |
405 | | - or metrics.get("buffered-input-records") |
406 | | - or {} |
407 | | - ) |
| 426 | + buffered = metrics.get("buffered-input-records") or {} |
408 | 427 |
|
409 | 428 | return { |
410 | 429 | "throughput_value": throughput.get("value"), |
@@ -495,7 +514,7 @@ def main() -> int: |
495 | 514 | "--payload-bytes", |
496 | 515 | dest="payload_bytes", |
497 | 516 | type=str, |
498 | | - help="Comma-separated payload sizes in bytes, e.g. 8,8192.", |
| 517 | + help=("Comma-separated payload sizes in bytes, e.g. 8,8192."), |
499 | 518 | ) |
500 | 519 | parser.add_argument( |
501 | 520 | "--program", |
|
0 commit comments