Skip to content

Commit b072b8e

Browse files
committed
benchmark: API key and Kafka option arguments
Changes to the end-to-end SQL benchmark run script: - Argument to supply additional Kafka options such as for SSL and SASL configuration - Argument to supply API key which is included as an HTTP header with each request - Specify optimized compilation profile - Comments for configuring pipeline resources Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 4bcf205 commit b072b8e

File tree

4 files changed

+42
-24
lines changed

4 files changed

+42
-24
lines changed

benchmark/feldera-sql/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,5 @@ This step has the following sub-steps:
9898
To run the benchmark itself, run:
9999

100100
```
101-
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API --kafka-broker $KAFKA_FROM_FELDERA
101+
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_FROM_FELDERA
102102
```

benchmark/feldera-sql/run.py

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,8 @@ def add_connector(connector_name, relation_name, is_input):
214214
"transport": {
215215
"name": transport_type,
216216
"config": {
217-
"auto.offset.reset": "earliest",
218-
"bootstrap.servers": kafka_broker,
219-
"enable.ssl.certificate.verification": "true",
220-
"sasl.mechanism": "PLAIN",
221-
"security.protocol": "PLAINTEXT",
222-
}
217+
"auto.offset.reset": "earliest"
218+
} | kafka_options
223219
},
224220
"format": {
225221
"name": "csv",
@@ -233,7 +229,7 @@ def add_connector(connector_name, relation_name, is_input):
233229
config["topics"] = [connector_name]
234230
else:
235231
config["topic"] = connector_name
236-
requests.put(f"{api_url}/v0/connectors/{connector_name}", json=json).raise_for_status()
232+
requests.put(f"{api_url}/v0/connectors/{connector_name}", headers=headers, json=json).raise_for_status()
237233
return {
238234
"connector_name": connector_name,
239235
"is_input": is_input,
@@ -248,18 +244,18 @@ def add_output_connector(connector_name, relation_name):
248244
return add_connector(connector_name, relation_name, False)
249245

250246
def stop_pipeline(pipeline_name, wait):
251-
requests.post(f"{api_url}/v0/pipelines/{pipeline_name}/shutdown").raise_for_status()
247+
requests.post(f"{api_url}/v0/pipelines/{pipeline_name}/shutdown", headers=headers).raise_for_status()
252248
if wait:
253249
return wait_for_status(pipeline_name, "Shutdown")
254250

255251
def start_pipeline(pipeline_name, wait):
256-
requests.post(f"{api_url}/v0/pipelines/{pipeline_name}/start").raise_for_status()
252+
requests.post(f"{api_url}/v0/pipelines/{pipeline_name}/start", headers=headers).raise_for_status()
257253
if wait:
258254
return wait_for_status(pipeline_name, "Running")
259255

260256
def wait_for_status(pipeline_name, status):
261257
start = time.time()
262-
while requests.get(f"{api_url}/v0/pipelines/{pipeline_name}").json()["state"]["current_status"] != status:
258+
while requests.get(f"{api_url}/v0/pipelines/{pipeline_name}", headers=headers).json()["state"]["current_status"] != status:
263259
time.sleep(.1)
264260
return time.time() - start
265261

@@ -287,7 +283,9 @@ def main():
287283
description='Nexmark benchmark demo'
288284
)
289285
parser.add_argument("--api-url", required=True, help="Feldera API URL (e.g., http://localhost:8080 )")
290-
parser.add_argument("--kafka-broker", required=True, help="Kafka broker (e.g., localhost:9092 )")
286+
parser.add_argument("--api-key", required=False, help="Feldera API key (e.g., \"apikey:0123456789ABCDEF\")")
287+
parser.add_argument("-O", "--option", action='append', required=True,
288+
help="Kafka options passed as -O option=value, e.g., -O bootstrap.servers=localhost:9092")
291289
parser.add_argument("--cores", type=int, help="Number of cores to use for workers (default: 16)")
292290
parser.add_argument('--lateness', action=argparse.BooleanOptionalAction, help='whether to use lateness for GC to save memory (default: --lateness)')
293291
parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-merge)')
@@ -300,9 +298,14 @@ def main():
300298
parser.add_argument('--metrics-interval', help='How often metrics should be sampled, in seconds (default: 1)')
301299
parser.set_defaults(lateness=True, merge=False, storage=False, cores=16, metrics_interval=1)
302300

303-
global api_url, kafka_broker
301+
global api_url, kafka_options, headers
304302
api_url = parser.parse_args().api_url
305-
kafka_broker = parser.parse_args().kafka_broker
303+
api_key = parser.parse_args().api_key
304+
headers = {} if api_key is None else {"authorization": f"Bearer {api_key}"}
305+
kafka_options = {}
306+
for option_value in parser.parse_args().option:
307+
option, value = option_value.split("=")
308+
kafka_options[option] = value
306309
with_lateness = parser.parse_args().lateness
307310
merge = parser.parse_args().merge
308311
queries = sort_queries(parse_queries(parser.parse_args().query))
@@ -327,20 +330,23 @@ def main():
327330
for program_name in queries:
328331
# Create program
329332
program_sql = table_sql(with_lateness) + QUERY_SQL[program_name]
330-
response = requests.put(f"{api_url}/v0/programs/{program_name}", json={
333+
response = requests.put(f"{api_url}/v0/programs/{program_name}", headers=headers, json={
331334
"description": f"Nexmark benchmark: {program_name}",
332-
"code": program_sql
335+
"code": program_sql,
336+
"config": {
337+
"profile": "optimized"
338+
}
333339
})
334340
response.raise_for_status()
335341
program_version = response.json()["version"]
336342

337343
# Compile program
338-
requests.post(f"{api_url}/v0/programs/{program_name}/compile", json={"version": program_version}).raise_for_status()
339344

345+
requests.post(f"{api_url}/v0/programs/{program_name}/compile", headers=headers, json={"version": program_version}).raise_for_status()
340346
print(f"Compiling program(s)...")
341347
for program_name in queries:
342348
while True:
343-
status = requests.get(f"{api_url}/v0/programs/{program_name}").json()["status"]
349+
status = requests.get(f"{api_url}/v0/programs/{program_name}", headers=headers).json()["status"]
344350
print(f"Program {program_name} status: {status}")
345351
if status == "Success":
346352
break
@@ -357,9 +363,21 @@ def main():
357363
print("Creating pipeline(s)...")
358364
for program_name in queries:
359365
pipeline_name = program_name
360-
requests.put(f"{api_url}/v0/pipelines/{pipeline_name}", json={
366+
requests.put(f"{api_url}/v0/pipelines/{pipeline_name}", headers=headers, json={
361367
"description": "",
362-
"config": {"workers": cores, "storage": storage, "min_storage_rows": min_storage_rows},
368+
"config": {
369+
"workers": cores,
370+
"storage": storage,
371+
"min_storage_rows": min_storage_rows,
372+
"resources": {
373+
# "cpu_cores_min": 0,
374+
# "cpu_cores_max": 16,
375+
# "memory_mb_min": 100,
376+
# "memory_mb_max": 32000,
377+
# "storage_mb_max": 128000,
378+
# "storage_class": "..."
379+
}
380+
},
363381
"program_name": program_name,
364382
"connectors": input_connectors + [output_connectors[s] for s in program_name.split(',')],
365383
}).raise_for_status()
@@ -389,7 +407,7 @@ def main():
389407
last_metrics = 0
390408
peak_memory = 0
391409
while True:
392-
stats = requests.get(f"{api_url}/v0/pipelines/{pipeline_name}/stats").json()
410+
stats = requests.get(f"{api_url}/v0/pipelines/{pipeline_name}/stats", headers=headers).json()
393411
elapsed = time.time() - start
394412
if "global_metrics" in stats:
395413
global_metrics = stats["global_metrics"]

benchmark/run-nexmark.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ case $runner:$language in
473473
CARGO=$(find_program cargo)
474474
run_log feldera-sql/run.py \
475475
--api-url="$api_url" \
476-
--kafka-broker="${kafka_from_feldera:-${kafka_broker}}" \
476+
-O bootstrap.servers="${kafka_from_feldera:-${kafka_broker}}" \
477477
--cores $cores \
478478
--input-topic-suffix "-$events" \
479479
--csv results.csv \

scripts/bench.bash

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ KAFKA_BROKER=localhost:9092
4848
rpk topic -X brokers=$KAFKA_BROKER delete bid auction person
4949
cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER
5050
FELDERA_API=http://localhost:8080
51-
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API --kafka-broker $KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_METRICS_CSV_FILE} --metrics-interval 1
51+
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_METRICS_CSV_FILE} --metrics-interval 1
5252

5353
rpk topic -X brokers=$KAFKA_BROKER delete bid auction person
5454
cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER
5555
FELDERA_API=http://localhost:8080
56-
python3 benchmark/feldera-sql/run.py --storage --api-url $FELDERA_API --kafka-broker $KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_STORAGE_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE} --metrics-interval 1
56+
python3 benchmark/feldera-sql/run.py --storage --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_STORAGE_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE} --metrics-interval 1
5757

5858
# Run galen benchmark
5959
cargo bench --bench galen -- --workers 10 --csv ${GALEN_CSV_FILE}

0 commit comments

Comments
 (0)