Skip to content

Commit 825a1de

Browse files
committed
benchmark: Convert SQL benchmarks for Nexmark and Tiktok to new API.
This removes the `--merge` option because it was already broken and we hadn't noticed (so it couldn't be too important) and the `--output` option because it was a bit of work to keep it and we didn't use it. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent cd65f16 commit 825a1de

3 files changed

Lines changed: 50 additions & 103 deletions

File tree

benchmark/feldera-sql/benchmarks/nexmark/table.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE TABLE person (
77
state VARCHAR,
88
date_time TIMESTAMP(3) NOT NULL {lateness},
99
extra VARCHAR
10-
);
10+
) WITH ('connectors' = '{person}');
1111
CREATE TABLE auction (
1212
id BIGINT,
1313
itemName VARCHAR,
@@ -19,7 +19,7 @@ CREATE TABLE auction (
1919
seller BIGINT,
2020
category BIGINT,
2121
extra VARCHAR
22-
);
22+
) WITH ('connectors' = '{auction}');
2323
CREATE TABLE bid (
2424
auction BIGINT,
2525
bidder BIGINT,
@@ -28,4 +28,4 @@ CREATE TABLE bid (
2828
url VARCHAR,
2929
date_time TIMESTAMP(3) NOT NULL {lateness},
3030
extra VARCHAR
31-
);
31+
) WITH ('connectors' = '{bid}');

benchmark/feldera-sql/benchmarks/tiktok/table.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ CREATE TABLE interactions (
88
interaction_date TIMESTAMP LATENESS INTERVAL 15 MINUTES,
99
previous_interaction_date TIMESTAMP,
1010
interaction_month TIMESTAMP
11-
);
11+
) WITH ('connectors' = '{interactions}');

benchmark/feldera-sql/run.py

Lines changed: 46 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#! /usr/bin/python3
22

33
import csv
4+
import json
45
import os
56
import sys
67
import time
@@ -19,7 +20,7 @@ def load_queries(folder):
1920
queries[f.split('.')[0]] = file.read()
2021
return queries
2122

22-
def load_table(folder, with_lateness):
23+
def load_table(folder, with_lateness, suffix):
2324
p = os.path.join(FILE_DIR, folder + '/table.sql')
2425
file = open(p, 'r')
2526
text = file.read()
@@ -29,11 +30,9 @@ def load_table(folder, with_lateness):
2930
i = line.find(table_start_string)
3031
if i >= 0:
3132
inputs += [line[i + len(table_start_string):].split(' ')[0]]
32-
lateness = ''
33-
if with_lateness:
34-
lateness = "LATENESS INTERVAL 4 SECONDS"
35-
text = text.replace('{lateness}', lateness)
36-
return [text, inputs]
33+
subst = {input: make_connector(input, suffix) for input in inputs}
34+
subst["lateness"] = "LATENESS INTERVAL 4 SECONDS"
35+
return text.format(**subst)
3736

3837
def sort_queries(queries):
3938
return sorted(queries, key=lambda q: int(q[1:]))
@@ -60,46 +59,28 @@ def parse_queries(all_queries, arg):
6059

6160
return queries
6261

63-
def add_connector(connector_name, relation_name, is_input):
64-
transport_type = "kafka_" + ("input" if is_input else "output")
65-
json = {
66-
"description": "",
67-
"config": {
68-
"transport": {
69-
"name": transport_type,
70-
"config": {
71-
"auto.offset.reset": "earliest"
72-
} | kafka_options
73-
},
74-
"format": {
75-
"name": "csv",
76-
"config": {}
77-
}
78-
}
79-
}
80-
config = json["config"]["transport"]["config"]
81-
if is_input:
82-
config["enable.partition.eof"] = "true"
83-
config["topics"] = [connector_name]
84-
else:
85-
config["topic"] = connector_name
86-
requests.put(f"{api_url}/v0/connectors/{connector_name}", headers=headers, json=json).raise_for_status()
87-
return {
88-
"connector_name": connector_name,
89-
"is_input": is_input,
90-
"name": connector_name,
91-
"relation_name": relation_name,
62+
def make_connector(topic, suffix):
63+
name = "kafka_input"
64+
config = {
65+
"topics": [topic + suffix],
66+
"enable.partition.eof": "true",
67+
"auto.offset.reset": "earliest"
9268
}
9369

70+
return json.dumps([{
71+
"format": {
72+
"name": "csv",
73+
"config": {},
74+
},
75+
"transport": {
76+
"name": name,
77+
"config": config | kafka_options
78+
}
79+
}], indent=4)
80+
9481
def get_full_name(folder, name):
9582
return folder.split('/')[-1] + '-' + name
9683

97-
def add_input_connector(connector_name, relation_name):
98-
return add_connector(connector_name, relation_name, True)
99-
100-
def add_output_connector(connector_name, relation_name):
101-
return add_connector(connector_name, relation_name, False)
102-
10384
def stop_pipeline(pipeline_name, wait):
10485
requests.post(f"{api_url}/v0/pipelines/{pipeline_name}/shutdown", headers=headers).raise_for_status()
10586
if wait:
@@ -112,7 +93,7 @@ def start_pipeline(pipeline_name, wait):
11293

11394
def wait_for_status(pipeline_name, status):
11495
start = time.time()
115-
while requests.get(f"{api_url}/v0/pipelines/{pipeline_name}", headers=headers).json()["state"]["current_status"] != status:
96+
while requests.get(f"{api_url}/v0/pipelines/{pipeline_name}", headers=headers).json()["deployment_status"] != status:
11697
time.sleep(.1)
11798
return time.time() - start
11899

@@ -145,8 +126,6 @@ def main():
145126
help="Kafka options passed as -O option=value, e.g., -O bootstrap.servers=localhost:9092")
146127
parser.add_argument("--cores", type=int, help="Number of cores to use for workers (default: 16)")
147128
parser.add_argument('--lateness', action=argparse.BooleanOptionalAction, help='whether to use lateness for GC to save memory (default: --lateness)')
148-
parser.add_argument('--output', action=argparse.BooleanOptionalAction, help='whether to write query output back to Kafka (default: --no-output)')
149-
parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-merge)')
150129
parser.add_argument('--storage', action=argparse.BooleanOptionalAction, help='whether to enable storage (default: --no-storage)')
151130
parser.add_argument("--poller-threads", required=False, type=int, help="Override number of poller threads to use")
152131
parser.add_argument('--min-storage-bytes', type=int, help='If storage is enabled, the minimum number of bytes to write a batch to storage.')
@@ -156,7 +135,7 @@ def main():
156135
parser.add_argument('--csv', help='File to write results in .csv format')
157136
parser.add_argument('--csv-metrics', help='File to write pipeline metrics (memory, disk) in .csv format')
158137
parser.add_argument('--metrics-interval', help='How often metrics should be sampled, in seconds (default: 1)')
159-
parser.set_defaults(lateness=True, output=False, merge=False, storage=False, cores=16, metrics_interval=1, folder='benchmarks/nexmark')
138+
parser.set_defaults(lateness=True, storage=False, cores=16, metrics_interval=1, folder='benchmarks/nexmark')
160139

161140
global api_url, kafka_options, headers
162141
api_url = parser.parse_args().api_url
@@ -166,12 +145,12 @@ def main():
166145
for option_value in parser.parse_args().option:
167146
option, value = option_value.split("=")
168147
kafka_options[option] = value
169-
with_lateness = parser.parse_args().lateness
170-
save_output = parser.parse_args().output
171-
merge = parser.parse_args().merge
148+
suffix = parser.parse_args().input_topic_suffix or ''
149+
172150
folder = parser.parse_args().folder
151+
table = load_table(folder, parser.parse_args().lateness, suffix)
173152
all_queries = load_queries(folder)
174-
table, inputs = load_table(folder, with_lateness)
153+
175154
queries = sort_queries(parse_queries(all_queries, parser.parse_args().query))
176155
cores = int(parser.parse_args().cores)
177156
storage = parser.parse_args().storage
@@ -181,65 +160,21 @@ def main():
181160
min_storage_bytes = parser.parse_args().min_storage_bytes
182161
if min_storage_bytes is not None:
183162
min_storage_bytes = int(min_storage_bytes)
184-
suffix = parser.parse_args().input_topic_suffix or ''
185163
csvfile = parser.parse_args().csv
186164
csvmetricsfile = parser.parse_args().csv_metrics
187165
metricsinterval = float(parser.parse_args().metrics_interval)
188166

189-
output_connector_names = queries
190-
if merge and len(queries) > 1:
191-
merged_name = ','.join(queries)
192-
QUERY_SQL[merged_name] = '\n'.join([QUERY_SQL[q] for q in queries])
193-
queries = [merged_name]
194-
195167
when = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time()))
196168

169+
print("Creating programs...")
197170
for program_name in queries:
198171
# Create program
199172
full_name = get_full_name(folder, program_name)
200173
program_sql = table + all_queries[program_name]
201-
response = requests.put(f"{api_url}/v0/programs/{full_name}", headers=headers, json={
174+
requests.put(f"{api_url}/v0/pipelines/{full_name}", headers=headers, json={
175+
"name": full_name,
202176
"description": f"Benchmark: {full_name}",
203-
"code": program_sql,
204-
"config": {
205-
"profile": "optimized"
206-
}
207-
})
208-
response.raise_for_status()
209-
program_version = response.json()["version"]
210-
211-
# Compile program
212-
213-
requests.post(f"{api_url}/v0/programs/{full_name}/compile", headers=headers, json={"version": program_version}).raise_for_status()
214-
print(f"Compiling program(s)...")
215-
for program_name in queries:
216-
full_name = get_full_name(folder, program_name)
217-
while True:
218-
status = requests.get(f"{api_url}/v0/programs/{full_name}", headers=headers).json()["status"]
219-
print(f"Program {full_name} status: {status}")
220-
if status == "Success":
221-
break
222-
elif status != "Pending" and status != "CompilingRust" and status != "CompilingSql":
223-
raise RuntimeError(f"Failed program compilation with status {status}")
224-
time.sleep(5)
225-
226-
input_connectors = [add_input_connector(s + suffix, s) for s in inputs]
227-
if save_output:
228-
output_connectors = {}
229-
for name in output_connector_names:
230-
output_connectors[name] = add_output_connector(name, name)
231-
232-
# Create pipelines
233-
print("Creating pipeline(s)...")
234-
for program_name in queries:
235-
pipeline_name = get_full_name(folder, program_name)
236-
if save_output:
237-
connectors = input_connectors + [output_connectors[s] for s in program_name.split(',')]
238-
else:
239-
connectors = input_connectors
240-
requests.put(f"{api_url}/v0/pipelines/{pipeline_name}", headers=headers, json={
241-
"description": "",
242-
"config": {
177+
"runtime_config": {
243178
"workers": cores,
244179
"storage": storage,
245180
"min_storage_bytes": min_storage_bytes,
@@ -253,10 +188,22 @@ def main():
253188
# "storage_class": "..."
254189
}
255190
},
256-
"program_name": pipeline_name,
257-
"connectors": connectors
191+
"program_config": {},
192+
"program_code": program_sql,
258193
}).raise_for_status()
259194

195+
print("Compiling program(s)...")
196+
for program_name in queries:
197+
full_name = get_full_name(folder, program_name)
198+
while True:
199+
status = requests.get(f"{api_url}/v0/pipelines/{full_name}", headers=headers).json()["program_status"]
200+
print(f"Program {full_name} status: {status}")
201+
if status == "Success":
202+
break
203+
elif status != "Pending" and status != "CompilingRust" and status != "CompilingSql":
204+
raise RuntimeError(f"Failed program compilation with status {status}")
205+
time.sleep(5)
206+
260207
# Stop pipelines
261208
print("Stopping pipeline(s)...")
262209
for pipeline_name in queries:

0 commit comments

Comments
 (0)