Skip to content

Commit 772ac9b

Browse files
committed
benchmarking: add join microbenchmark.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 317b6b6 commit 772ac9b

File tree

1 file changed

+129
-4
lines changed

1 file changed

+129
-4
lines changed

scripts/scalebench.py

Lines changed: 129 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,15 @@
4141
from feldera import FelderaClient, PipelineBuilder
4242
from feldera.runtime_config import RuntimeConfig
4343

44-
PROGRAMS = ["u64", "u64N", "binary", "string", "binary_primary_key"]
44+
PROGRAMS = [
45+
"u64",
46+
"u64N",
47+
"u64Nprimary",
48+
"u64Njoin-no-match",
49+
"binary",
50+
"string",
51+
"binary_primary_key",
52+
]
4553
WORKER_COUNTS = [1, 2, 4, 8, 12, 16, 20]
4654
DEFAULT_PAYLOAD_BYTES = [8, 128, 512, 4096, 32768]
4755
BENCH_DURATION_S = 120
@@ -91,12 +99,12 @@ def normalize_payload_bytes(
9199
"Program u64 does not support payload sizes other than 8 bytes."
92100
)
93101
return [8]
94-
if program == "u64N":
102+
if program in ("u64N", "u64Nprimary", "u64Njoin-no-match"):
95103
invalid = [size for size in payload_bytes if size % 8 != 0]
96104
if invalid:
97105
invalid_csv = ",".join(str(size) for size in invalid)
98106
raise SystemExit(
99-
"Program u64N requires payload sizes that are multiples of 8 bytes. "
107+
f"Program {program} requires payload sizes that are multiples of 8 bytes. "
100108
f"Invalid values: {invalid_csv}"
101109
)
102110
return payload_bytes
@@ -121,6 +129,123 @@ def make_sql(program: str, datagen_workers: int, payload_bytes: int) -> str:
121129
)
122130
table_fields += "\n"
123131
fields = {f"payload{i}": {"strategy": "uniform"} for i in range(u64_count)}
132+
elif program == "u64Nprimary":
133+
if payload_bytes % 8 != 0:
134+
raise ValueError(
135+
"Program u64Nprimary requires payload-bytes to be a multiple of 8, "
136+
f"got {payload_bytes}"
137+
)
138+
u64_count = payload_bytes // 8
139+
if u64_count <= 0:
140+
raise ValueError(
141+
f"Program u64Nprimary requires payload-bytes >= 8, got {payload_bytes}"
142+
)
143+
table_fields = ",\n".join(
144+
(
145+
f" payload{i} BIGINT NOT NULL PRIMARY KEY"
146+
if i == 0
147+
else f" payload{i} BIGINT NOT NULL"
148+
)
149+
for i in range(u64_count)
150+
)
151+
table_fields += "\n"
152+
fields = {
153+
f"payload{i}": {"strategy": "uniform"} for i in range(1, u64_count)
154+
} or None
155+
elif program == "u64Njoin-no-match":
156+
if payload_bytes % 8 != 0:
157+
raise ValueError(
158+
"Program u64Njoin-no-match requires payload-bytes to be a multiple "
159+
f"of 8, got {payload_bytes}"
160+
)
161+
u64_count = payload_bytes // 8
162+
if u64_count <= 0:
163+
raise ValueError(
164+
"Program u64Njoin-no-match requires payload-bytes >= 8, "
165+
f"got {payload_bytes}"
166+
)
167+
168+
# Use identical schemas and join on payload0, which is the primary key.
169+
# Left and right generate keys from disjoint ranges, so the inner join
170+
# never matches.
171+
table_fields = ",\n".join(
172+
(
173+
f" payload{i} BIGINT NOT NULL PRIMARY KEY"
174+
if i == 0
175+
else f" payload{i} BIGINT NOT NULL"
176+
)
177+
for i in range(u64_count)
178+
)
179+
table_fields += "\n"
180+
181+
left_fields = {
182+
"payload0": {
183+
"strategy": "increment",
184+
"range": [0, 1_000_000_000],
185+
}
186+
}
187+
right_fields = {
188+
"payload0": {
189+
"strategy": "increment",
190+
"range": [1_000_000_000, 2_000_000_000],
191+
}
192+
}
193+
for i in range(1, u64_count):
194+
left_fields[f"payload{i}"] = {"strategy": "uniform"}
195+
right_fields[f"payload{i}"] = {"strategy": "uniform"}
196+
197+
left_connectors = json.dumps(
198+
[
199+
{
200+
"name": "data_left",
201+
"labels": ["backfill"],
202+
"transport": {
203+
"name": "datagen",
204+
"config": {
205+
"workers": datagen_workers,
206+
"plan": [{"limit": 50_000_000, "fields": left_fields}],
207+
},
208+
},
209+
}
210+
],
211+
separators=(",", ":"),
212+
)
213+
right_connectors = json.dumps(
214+
[
215+
{
216+
"name": "data_right",
217+
"start_after": "backfill",
218+
"transport": {
219+
"name": "datagen",
220+
"config": {
221+
"workers": datagen_workers,
222+
"plan": [{"fields": right_fields}],
223+
},
224+
},
225+
}
226+
],
227+
separators=(",", ":"),
228+
)
229+
230+
return (
231+
"CREATE TABLE left_table (\n"
232+
f"{table_fields}"
233+
") WITH (\n"
234+
# " 'materialized' = 'true',\n"
235+
f" 'connectors' = '{left_connectors}'\n"
236+
");\n"
237+
"CREATE TABLE right_table (\n"
238+
f"{table_fields}"
239+
") WITH (\n"
240+
# " 'materialized' = 'true',\n"
241+
f" 'connectors' = '{right_connectors}'\n"
242+
");\n"
243+
"CREATE MATERIALIZED VIEW simple AS\n"
244+
"SELECT l.payload0 AS payload0\n"
245+
"FROM left_table AS l\n"
246+
"INNER JOIN right_table AS r\n"
247+
" ON l.payload0 = r.payload0;"
248+
)
124249
elif program == "binary":
125250
table_fields = " payload BINARY NOT NULL\n"
126251
fields = {
@@ -637,7 +762,7 @@ def main() -> int:
637762
f"workers={workers} payload_kib={payload_kib:.3f}"
638763
)
639764

640-
datagen_workers = max(8, workers + 8)
765+
datagen_workers = max(16, (workers * 3) + 8)
641766
sql = make_sql(
642767
program=program,
643768
datagen_workers=datagen_workers,

0 commit comments

Comments
 (0)