Skip to content

Commit 35e47f9

Browse files
authored
Log SQL metrics (#1875)
Signed-off-by: Matei <matei@feldera.com>
1 parent 1bfaf9d commit 35e47f9

4 files changed

Lines changed: 65 additions & 15 deletions

File tree

Earthfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ benchmark:
538538
END
539539
SAVE ARTIFACT crates/nexmark/nexmark_results.csv AS LOCAL .
540540
SAVE ARTIFACT crates/nexmark/sql_nexmark_results.csv AS LOCAL .
541+
SAVE ARTIFACT crates/nexmark/sql_nexmark_metrics.csv AS LOCAL .
541542
SAVE ARTIFACT crates/nexmark/dram_nexmark_results.csv AS LOCAL .
542543
SAVE ARTIFACT crates/dbsp/galen_results.csv AS LOCAL .
543544
#SAVE ARTIFACT crates/dbsp/ldbc_results.csv AS LOCAL .

benchmark/feldera-sql/run.py

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,12 @@ def wait_for_status(pipeline_name, status):
265265

266266
def write_results(results, outfile):
267267
writer = csv.writer(outfile)
268-
writer.writerow(['when', 'runner', 'mode', 'language', 'name', 'num_cores', 'num_events', 'elapsed'])
268+
writer.writerow(['when', 'runner', 'mode', 'language', 'name', 'num_cores', 'num_events', 'elapsed', 'peak_memory_bytes'])
269269
writer.writerows(results)
270270

271+
def write_metrics(results, outfile):
272+
writer = csv.writer(outfile)
273+
writer.writerows(results)
271274

272275
def main():
273276
# Command-line arguments
@@ -278,13 +281,15 @@ def main():
278281
parser.add_argument("--kafka-broker", required=True, help="Kafka broker (e.g., localhost:9092 )")
279282
parser.add_argument("--cores", type=int, help="Number of cores to use for workers (default: 16)")
280283
parser.add_argument('--lateness', action=argparse.BooleanOptionalAction, help='whether to use lateness for GC to save memory (default: --lateness)')
281-
parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-lateness)')
284+
parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-merge)')
282285
parser.add_argument('--storage', action=argparse.BooleanOptionalAction, help='whether to enable storage (default: --no-storage)')
283286
parser.add_argument('--min-storage-rows', type=int, help='If storage is enabled, the minimum number of rows to write a batch to storage.')
284287
parser.add_argument('--query', action='append', help='queries to run (by default, all queries), specify one or more of: ' + ','.join(sort_queries(QUERY_SQL.keys())))
285288
parser.add_argument('--input-topic-suffix', help='suffix to apply to input topic names (by default, "")')
286289
parser.add_argument('--csv', help='File to write results in .csv format')
287-
parser.set_defaults(lateness=True, merge=False, storage=False, cores=16)
290+
parser.add_argument('--csv-metrics', help='File to write pipeline metrics (memory, disk) in .csv format')
291+
parser.add_argument('--metrics-interval', help='How often metrics should be sampled, in seconds (default: 1)')
292+
parser.set_defaults(lateness=True, merge=False, storage=False, cores=16, metrics_interval=1)
288293

289294
global api_url, kafka_broker
290295
api_url = parser.parse_args().api_url
@@ -299,6 +304,8 @@ def main():
299304
min_storage_rows = int(min_storage_rows)
300305
suffix = parser.parse_args().input_topic_suffix or ''
301306
csvfile = parser.parse_args().csv
307+
csvmetricsfile = parser.parse_args().csv_metrics
308+
metricsinterval = float(parser.parse_args().metrics_interval)
302309

303310
output_connector_names = queries
304311
if merge and len(queries) > 1:
@@ -357,6 +364,14 @@ def main():
357364

358365
# Run the pipelines
359366
results = []
367+
pipeline_metric_names = ["name", "elapsed_seconds", "rss_bytes", "buffered_input_records", "total_input_records", "total_processed_records"]
368+
disk_metric_names = ["total_files_created", "total_bytes_written", "total_writes_success", "buffer_cache_hit"]
369+
disk_write_latency_names = ["count", "first", "middle", "last", "minimum", "maximum", "mean"]
370+
for s in disk_metric_names:
371+
pipeline_metric_names += ["disk_" + s]
372+
for s in disk_write_latency_names:
373+
pipeline_metric_names += ["disk_write_latency_histogram_" + s]
374+
pipeline_metrics = [pipeline_metric_names]
360375
for pipeline_name in queries:
361376
start = time.time()
362377

@@ -367,32 +382,63 @@ def main():
367382
# Wait till the pipeline is completed
368383
start = time.time()
369384
last_processed = 0
385+
last_metrics = 0
386+
peak_memory = 0
370387
while True:
371388
stats = requests.get(f"{api_url}/v0/pipelines/{pipeline_name}/stats").json()
372389
elapsed = time.time() - start
373-
processed = stats["global_metrics"]["total_processed_records"]
374-
if processed > last_processed:
375-
before, after = ('\r', '') if os.isatty(1) else ('', '\n')
376-
sys.stdout.write(f"{before}Pipeline {pipeline_name} processed {processed} records in {elapsed:.1f} seconds{after}")
377-
last_processed = processed
378-
if stats["global_metrics"]["pipeline_complete"]:
379-
break
390+
if "global_metrics" in stats:
391+
global_metrics = stats["global_metrics"]
392+
processed = global_metrics["total_processed_records"]
393+
peak_memory = max(peak_memory, global_metrics["rss_bytes"])
394+
if processed > last_processed:
395+
before, after = ('\r', '') if os.isatty(1) else ('', '\n')
396+
sys.stdout.write(f"{before}Pipeline {pipeline_name} processed {processed} records in {elapsed:.1f} seconds{after}")
397+
if elapsed - last_metrics > metricsinterval:
398+
last_metrics = elapsed
399+
metrics_list = [pipeline_name, elapsed, global_metrics["rss_bytes"], global_metrics["buffered_input_records"], global_metrics["total_input_records"], global_metrics["total_processed_records"]]
400+
disk_index = len(metrics_list)
401+
for i in range(11):
402+
metrics_list += [""]
403+
for s in stats["metrics"]:
404+
if s["key"] == "disk.total_files_created":
405+
metrics_list[disk_index] = s["value"]["Counter"]
406+
elif s["key"] == "disk.total_bytes_written":
407+
metrics_list[disk_index + 1] = s["value"]["Counter"]
408+
elif s["key"] == "disk.total_writes_success":
409+
metrics_list[disk_index + 2] = s["value"]["Counter"]
410+
elif s["key"] == "disk.buffer_cache_hit":
411+
metrics_list[disk_index + 3] = s["value"]["Counter"]
412+
elif s["key"] == "disk.write_latency" and s["value"]["Histogram"] is not None:
413+
print(str(s))
414+
metrics_list[disk_index + 4] = s["value"]["Histogram"]["count"]
415+
metrics_list[disk_index + 5] = s["value"]["Histogram"]["first"]
416+
metrics_list[disk_index + 6] = s["value"]["Histogram"]["middle"]
417+
metrics_list[disk_index + 7] = s["value"]["Histogram"]["last"]
418+
metrics_list[disk_index + 8] = s["value"]["Histogram"]["minimum"]
419+
metrics_list[disk_index + 9] = s["value"]["Histogram"]["maximum"]
420+
metrics_list[disk_index + 10] = s["value"]["Histogram"]["mean"]
421+
pipeline_metrics += [metrics_list]
422+
last_processed = processed
423+
if stats["global_metrics"]["pipeline_complete"]:
424+
break
380425
time.sleep(.1)
381426
if os.isatty(1):
382427
print()
383428
elapsed = "{:.1f}".format(time.time() - start)
384429
print(f"Pipeline {pipeline_name} completed in {elapsed} s")
385430

386-
results += [[when, "feldera", "stream", "sql", pipeline_name, cores, last_processed, elapsed]]
431+
results += [[when, "feldera", "stream", "sql", pipeline_name, cores, last_processed, elapsed, peak_memory]]
387432

388433
# Start pipeline
389434
elapsed = stop_pipeline(pipeline_name, True)
390435
print(f"Stopped pipeline {pipeline_name} in {elapsed:.1f} s")
391-
436+
392437
write_results(results, sys.stdout)
393438
if csvfile is not None:
394439
write_results(results, open(csvfile, 'w', newline=''))
395-
440+
if csvmetricsfile is not None:
441+
write_metrics(pipeline_metrics, open(csvmetricsfile, 'w', newline=''))
396442

397443
if __name__ == "__main__":
398444
main()

scripts/bench-publish.bash

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ fi
1818
NEXMARK_CSV_FILE='nexmark_results.csv'
1919
NEXMARK_DRAM_CSV_FILE='dram_nexmark_results.csv'
2020
NEXMARK_SQL_CSV_FILE='sql_nexmark_results.csv'
21+
NEXMARK_SQL_METRICS_CSV_FILE='sql_nexmark_metrics.csv'
2122
NEXMARK_PERSISTENCE_CSV_FILE='persistence_nexmark_results.csv'
2223
GALEN_CSV_FILE='galen_results.csv'
2324
LDBC_CSV_FILE='ldbc_results.csv'
@@ -46,11 +47,12 @@ fi
4647

4748
# Copy nexmark results
4849
mkdir -p ${DEPLOY_DIR}
49-
mv ${NEXMARK_CSV_FILE} ${NEXMARK_DRAM_CSV_FILE} ${NEXMARK_SQL_CSV_FILE} ${DEPLOY_DIR}
50+
mv ${NEXMARK_CSV_FILE} ${NEXMARK_DRAM_CSV_FILE} ${NEXMARK_SQL_CSV_FILE} ${NEXMARK_SQL_METRICS_CSV_FILE} ${DEPLOY_DIR}
5051
gzip -f ${DEPLOY_DIR}/${NEXMARK_CSV_FILE}
5152
#gzip -f ${DEPLOY_DIR}/${NEXMARK_PERSISTENCE_CSV_FILE}
5253
gzip -f ${DEPLOY_DIR}/${NEXMARK_DRAM_CSV_FILE}
5354
gzip -f ${DEPLOY_DIR}/${NEXMARK_SQL_CSV_FILE}
55+
gzip -f ${DEPLOY_DIR}/${NEXMARK_SQL_METRICS_CSV_FILE}
5456

5557
# Add galen results to repo
5658
DEPLOY_DIR="gh-pages/galen/${CI_MACHINE_TYPE}/${PR_COMMIT_SHA}/"

scripts/bench.bash

100755100644
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ fi
1313
NEXMARK_CSV_FILE='nexmark_results.csv'
1414
NEXMARK_DRAM_CSV_FILE='dram_nexmark_results.csv'
1515
NEXMARK_SQL_CSV_FILE='sql_nexmark_results.csv'
16+
NEXMARK_SQL_METRICS_CSV_FILE='sql_nexmark_metrics.csv'
1617
NEXMARK_PERSISTENCE_CSV_FILE='persistence_nexmark_results.csv'
1718
GALEN_CSV_FILE='galen_results.csv'
1819
LDBC_CSV_FILE='ldbc_results.csv'
@@ -41,7 +42,7 @@ KAFKA_BROKER=localhost:9092
4142
rpk topic -X brokers=$KAFKA_BROKER delete bid auction person
4243
cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER
4344
FELDERA_API=http://localhost:8080
44-
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API --kafka-broker $KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE}
45+
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API --kafka-broker $KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_METRICS_CSV_FILE} --metrics-interval 1
4546

4647
# Run galen benchmark
4748
cargo bench --bench galen --features="with-csv" -- --workers 10 --csv ${GALEN_CSV_FILE}

0 commit comments

Comments
 (0)