4141from feldera import FelderaClient , PipelineBuilder
4242from feldera .runtime_config import RuntimeConfig
4343
44- PROGRAMS = ["u64" , "u64N" , "binary" , "string" , "binary_primary_key" ]
44+ PROGRAMS = [
45+ "u64" ,
46+ "u64N" ,
47+ "u64Nprimary" ,
48+ "u64Njoin-no-match" ,
49+ "binary" ,
50+ "string" ,
51+ "binary_primary_key" ,
52+ ]
4553WORKER_COUNTS = [1 , 2 , 4 , 8 , 12 , 16 , 20 ]
4654DEFAULT_PAYLOAD_BYTES = [8 , 128 , 512 , 4096 , 32768 ]
4755BENCH_DURATION_S = 120
@@ -91,12 +99,12 @@ def normalize_payload_bytes(
9199 "Program u64 does not support payload sizes other than 8 bytes."
92100 )
93101 return [8 ]
94- if program == "u64N" :
102+ if program in ( "u64N" , "u64Nprimary" , "u64Njoin-no-match" ) :
95103 invalid = [size for size in payload_bytes if size % 8 != 0 ]
96104 if invalid :
97105 invalid_csv = "," .join (str (size ) for size in invalid )
98106 raise SystemExit (
99- "Program u64N requires payload sizes that are multiples of 8 bytes. "
107+ f "Program { program } requires payload sizes that are multiples of 8 bytes. "
100108 f"Invalid values: { invalid_csv } "
101109 )
102110 return payload_bytes
@@ -121,6 +129,123 @@ def make_sql(program: str, datagen_workers: int, payload_bytes: int) -> str:
121129 )
122130 table_fields += "\n "
123131 fields = {f"payload{ i } " : {"strategy" : "uniform" } for i in range (u64_count )}
132+ elif program == "u64Nprimary" :
133+ if payload_bytes % 8 != 0 :
134+ raise ValueError (
135+ "Program u64Nprimary requires payload-bytes to be a multiple of 8, "
136+ f"got { payload_bytes } "
137+ )
138+ u64_count = payload_bytes // 8
139+ if u64_count <= 0 :
140+ raise ValueError (
141+ f"Program u64Nprimary requires payload-bytes >= 8, got { payload_bytes } "
142+ )
143+ table_fields = ",\n " .join (
144+ (
145+ f" payload{ i } BIGINT NOT NULL PRIMARY KEY"
146+ if i == 0
147+ else f" payload{ i } BIGINT NOT NULL"
148+ )
149+ for i in range (u64_count )
150+ )
151+ table_fields += "\n "
152+ fields = {
153+ f"payload{ i } " : {"strategy" : "uniform" } for i in range (1 , u64_count )
154+ } or None
155+ elif program == "u64Njoin-no-match" :
156+ if payload_bytes % 8 != 0 :
157+ raise ValueError (
158+ "Program u64Njoin-no-match requires payload-bytes to be a multiple "
159+ f"of 8, got { payload_bytes } "
160+ )
161+ u64_count = payload_bytes // 8
162+ if u64_count <= 0 :
163+ raise ValueError (
164+ "Program u64Njoin-no-match requires payload-bytes >= 8, "
165+ f"got { payload_bytes } "
166+ )
167+
168+ # Use identical schemas and join on payload0, which is the primary key.
169+ # Left and right generate keys from disjoint ranges, so the inner join
170+ # never matches.
171+ table_fields = ",\n " .join (
172+ (
173+ f" payload{ i } BIGINT NOT NULL PRIMARY KEY"
174+ if i == 0
175+ else f" payload{ i } BIGINT NOT NULL"
176+ )
177+ for i in range (u64_count )
178+ )
179+ table_fields += "\n "
180+
181+ left_fields = {
182+ "payload0" : {
183+ "strategy" : "increment" ,
184+ "range" : [0 , 1_000_000_000 ],
185+ }
186+ }
187+ right_fields = {
188+ "payload0" : {
189+ "strategy" : "increment" ,
190+ "range" : [1_000_000_000 , 2_000_000_000 ],
191+ }
192+ }
193+ for i in range (1 , u64_count ):
194+ left_fields [f"payload{ i } " ] = {"strategy" : "uniform" }
195+ right_fields [f"payload{ i } " ] = {"strategy" : "uniform" }
196+
197+ left_connectors = json .dumps (
198+ [
199+ {
200+ "name" : "data_left" ,
201+ "labels" : ["backfill" ],
202+ "transport" : {
203+ "name" : "datagen" ,
204+ "config" : {
205+ "workers" : datagen_workers ,
206+ "plan" : [{"limit" : 50_000_000 , "fields" : left_fields }],
207+ },
208+ },
209+ }
210+ ],
211+ separators = ("," , ":" ),
212+ )
213+ right_connectors = json .dumps (
214+ [
215+ {
216+ "name" : "data_right" ,
217+ "start_after" : "backfill" ,
218+ "transport" : {
219+ "name" : "datagen" ,
220+ "config" : {
221+ "workers" : datagen_workers ,
222+ "plan" : [{"fields" : right_fields }],
223+ },
224+ },
225+ }
226+ ],
227+ separators = ("," , ":" ),
228+ )
229+
230+ return (
231+ "CREATE TABLE left_table (\n "
232+ f"{ table_fields } "
233+ ") WITH (\n "
234+ # " 'materialized' = 'true',\n"
235+ f" 'connectors' = '{ left_connectors } '\n "
236+ ");\n "
237+ "CREATE TABLE right_table (\n "
238+ f"{ table_fields } "
239+ ") WITH (\n "
240+ # " 'materialized' = 'true',\n"
241+ f" 'connectors' = '{ right_connectors } '\n "
242+ ");\n "
243+ "CREATE MATERIALIZED VIEW simple AS\n "
244+ "SELECT l.payload0 AS payload0\n "
245+ "FROM left_table AS l\n "
246+ "INNER JOIN right_table AS r\n "
247+ " ON l.payload0 = r.payload0;"
248+ )
124249 elif program == "binary" :
125250 table_fields = " payload BINARY NOT NULL\n "
126251 fields = {
@@ -637,7 +762,7 @@ def main() -> int:
637762 f"workers={ workers } payload_kib={ payload_kib :.3f} "
638763 )
639764
640- datagen_workers = max (8 , workers + 8 )
765+ datagen_workers = max (16 , ( workers * 3 ) + 8 )
641766 sql = make_sql (
642767 program = program ,
643768 datagen_workers = datagen_workers ,
0 commit comments