Skip to content

Commit a284687

Browse files
blpgz
authored andcommitted
benchmark: Add number of late drops to logged data.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent ce67b7e commit a284687

File tree

1 file changed

+8
-9
lines changed

1 file changed

+8
-9
lines changed

benchmark/feldera-sql/run.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def wait_for_status(pipeline_name, status):
118118

119119
def write_results(results, outfile):
120120
writer = csv.writer(outfile)
121-
writer.writerow(['when', 'runner', 'mode', 'language', 'name', 'num_cores', 'num_events', 'elapsed', 'peak_memory_bytes', 'cpu_msecs'])
121+
writer.writerow(['when', 'runner', 'mode', 'language', 'name', 'num_cores', 'num_events', 'elapsed', 'peak_memory_bytes', 'cpu_msecs', 'late_drops'])
122122
writer.writerows(results)
123123

124124
def write_metrics(keys, results, outfile):
@@ -262,7 +262,6 @@ def main():
262262
# Wait till the pipeline is completed
263263
start = time.time()
264264
last_processed = 0
265-
last_metrics = 0
266265
peak_memory = 0
267266
error = False
268267
while True:
@@ -278,12 +277,6 @@ def main():
278277
processed = global_metrics["total_processed_records"]
279278
peak_memory = max(peak_memory, global_metrics["rss_bytes"])
280279
cpu_msecs = global_metrics.get("cpu_msecs", 0)
281-
if processed > last_processed:
282-
before, after = ('\r', '') if os.isatty(1) else ('', '\n')
283-
peak_gib = peak_memory / 1024 / 1024 / 1024
284-
cpu_secs = cpu_msecs / 1000
285-
sys.stdout.write(f"{before}Pipeline {full_name} processed {processed} records in {elapsed:.1f} seconds ({peak_gib:.1f} GiB peak memory, {cpu_secs:.1f} s CPU time){after}")
286-
last_metrics = elapsed
287280
metrics_dict = {"name":pipeline_name, "elapsed_seconds":elapsed}
288281
for key, value in global_metrics.items():
289282
metrics_seen.add(key)
@@ -304,6 +297,12 @@ def main():
304297
metrics_seen.add(k)
305298
metrics_dict[k] = value["Histogram"][v]
306299
pipeline_metrics += [metrics_dict]
300+
late_drops = metrics_dict.get("records_late", 0)
301+
if processed > last_processed:
302+
before, after = ('\r', '') if os.isatty(1) else ('', '\n')
303+
peak_gib = peak_memory / 1024 / 1024 / 1024
304+
cpu_secs = cpu_msecs / 1000
305+
sys.stdout.write(f"{before}Pipeline {full_name} processed {processed} records in {elapsed:.1f} seconds ({peak_gib:.1f} GiB peak memory, {cpu_secs:.1f} s CPU time, {late_drops} late drops){after}")
307306
last_processed = processed
308307
if stats["global_metrics"]["pipeline_complete"]:
309308
break
@@ -316,7 +315,7 @@ def main():
316315
else:
317316
print(f"Pipeline {full_name} completed in {elapsed} s")
318317

319-
results += [[when, "feldera", "stream", "sql", pipeline_name, cores, last_processed, elapsed, peak_memory, cpu_msecs]]
318+
results += [[when, "feldera", "stream", "sql", pipeline_name, cores, last_processed, elapsed, peak_memory, cpu_msecs, late_drops]]
320319

321320
if profile:
322321
response = requests.get(f"{api_url}/v0/pipelines/{full_name}/circuit_profile", headers=headers)

0 commit comments

Comments
 (0)