@@ -265,9 +265,12 @@ def wait_for_status(pipeline_name, status):
265265
266266def write_results (results , outfile ):
267267 writer = csv .writer (outfile )
268- writer .writerow (['when' , 'runner' , 'mode' , 'language' , 'name' , 'num_cores' , 'num_events' , 'elapsed' ])
268+ writer .writerow (['when' , 'runner' , 'mode' , 'language' , 'name' , 'num_cores' , 'num_events' , 'elapsed' , 'peak_memory_bytes' ])
269269 writer .writerows (results )
270270
271+ def write_metrics (results , outfile ):
272+ writer = csv .writer (outfile )
273+ writer .writerows (results )
271274
272275def main ():
273276 # Command-line arguments
@@ -278,13 +281,15 @@ def main():
278281 parser .add_argument ("--kafka-broker" , required = True , help = "Kafka broker (e.g., localhost:9092 )" )
279282 parser .add_argument ("--cores" , type = int , help = "Number of cores to use for workers (default: 16)" )
280283 parser .add_argument ('--lateness' , action = argparse .BooleanOptionalAction , help = 'whether to use lateness for GC to save memory (default: --lateness)' )
281- parser .add_argument ('--merge' , action = argparse .BooleanOptionalAction , help = 'whether to merge all the queries into one program (default: --no-lateness )' )
284+ parser .add_argument ('--merge' , action = argparse .BooleanOptionalAction , help = 'whether to merge all the queries into one program (default: --no-merge )' )
282285 parser .add_argument ('--storage' , action = argparse .BooleanOptionalAction , help = 'whether to enable storage (default: --no-storage)' )
283286 parser .add_argument ('--min-storage-rows' , type = int , help = 'If storage is enabled, the minimum number of rows to write a batch to storage.' )
284287 parser .add_argument ('--query' , action = 'append' , help = 'queries to run (by default, all queries), specify one or more of: ' + ',' .join (sort_queries (QUERY_SQL .keys ())))
285288 parser .add_argument ('--input-topic-suffix' , help = 'suffix to apply to input topic names (by default, "")' )
286289 parser .add_argument ('--csv' , help = 'File to write results in .csv format' )
287- parser .set_defaults (lateness = True , merge = False , storage = False , cores = 16 )
290+ parser .add_argument ('--csv-metrics' , help = 'File to write pipeline metrics (memory, disk) in .csv format' )
291+ parser .add_argument ('--metrics-interval' , help = 'How often metrics should be sampled, in seconds (default: 1)' )
292+ parser .set_defaults (lateness = True , merge = False , storage = False , cores = 16 , metrics_interval = 1 )
288293
289294 global api_url , kafka_broker
290295 api_url = parser .parse_args ().api_url
@@ -299,6 +304,8 @@ def main():
299304 min_storage_rows = int (min_storage_rows )
300305 suffix = parser .parse_args ().input_topic_suffix or ''
301306 csvfile = parser .parse_args ().csv
307+ csvmetricsfile = parser .parse_args ().csv_metrics
308+ metricsinterval = float (parser .parse_args ().metrics_interval )
302309
303310 output_connector_names = queries
304311 if merge and len (queries ) > 1 :
@@ -357,6 +364,14 @@ def main():
357364
358365 # Run the pipelines
359366 results = []
367+ pipeline_metric_names = ["name" , "elapsed_seconds" , "rss_bytes" , "buffered_input_records" , "total_input_records" , "total_processed_records" ]
368+ disk_metric_names = ["total_files_created" , "total_bytes_written" , "total_writes_success" , "buffer_cache_hit" ]
369+ disk_write_latency_names = ["count" , "first" , "middle" , "last" , "minimum" , "maximum" , "mean" ]
370+ for s in disk_metric_names :
371+ pipeline_metric_names += ["disk_" + s ]
372+ for s in disk_write_latency_names :
373+ pipeline_metric_names += ["disk_write_latency_histogram_" + s ]
374+ pipeline_metrics = [pipeline_metric_names ]
360375 for pipeline_name in queries :
361376 start = time .time ()
362377
@@ -367,32 +382,63 @@ def main():
367382 # Wait till the pipeline is completed
368383 start = time .time ()
369384 last_processed = 0
385+ last_metrics = 0
386+ peak_memory = 0
370387 while True :
371388 stats = requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } /stats" ).json ()
372389 elapsed = time .time () - start
373- processed = stats ["global_metrics" ]["total_processed_records" ]
374- if processed > last_processed :
375- before , after = ('\r ' , '' ) if os .isatty (1 ) else ('' , '\n ' )
376- sys .stdout .write (f"{ before } Pipeline { pipeline_name } processed { processed } records in { elapsed :.1f} seconds{ after } " )
377- last_processed = processed
378- if stats ["global_metrics" ]["pipeline_complete" ]:
379- break
390+ if "global_metrics" in stats :
391+ global_metrics = stats ["global_metrics" ]
392+ processed = global_metrics ["total_processed_records" ]
393+ peak_memory = max (peak_memory , global_metrics ["rss_bytes" ])
394+ if processed > last_processed :
395+ before , after = ('\r ' , '' ) if os .isatty (1 ) else ('' , '\n ' )
396+ sys .stdout .write (f"{ before } Pipeline { pipeline_name } processed { processed } records in { elapsed :.1f} seconds{ after } " )
397+ if elapsed - last_metrics > metricsinterval :
398+ last_metrics = elapsed
399+ metrics_list = [pipeline_name , elapsed , global_metrics ["rss_bytes" ], global_metrics ["buffered_input_records" ], global_metrics ["total_input_records" ], global_metrics ["total_processed_records" ]]
400+ disk_index = len (metrics_list )
401+ for i in range (11 ):
402+ metrics_list += ["" ]
403+ for s in stats ["metrics" ]:
404+ if s ["key" ] == "disk.total_files_created" :
405+ metrics_list [disk_index ] = s ["value" ]["Counter" ]
406+ elif s ["key" ] == "disk.total_bytes_written" :
407+ metrics_list [disk_index + 1 ] = s ["value" ]["Counter" ]
408+ elif s ["key" ] == "disk.total_writes_success" :
409+ metrics_list [disk_index + 2 ] = s ["value" ]["Counter" ]
410+ elif s ["key" ] == "disk.buffer_cache_hit" :
411+ metrics_list [disk_index + 3 ] = s ["value" ]["Counter" ]
412+ elif s ["key" ] == "disk.write_latency" and s ["value" ]["Histogram" ] is not None :
413+ print (str (s ))
414+ metrics_list [disk_index + 4 ] = s ["value" ]["Histogram" ]["count" ]
415+ metrics_list [disk_index + 5 ] = s ["value" ]["Histogram" ]["first" ]
416+ metrics_list [disk_index + 6 ] = s ["value" ]["Histogram" ]["middle" ]
417+ metrics_list [disk_index + 7 ] = s ["value" ]["Histogram" ]["last" ]
418+ metrics_list [disk_index + 8 ] = s ["value" ]["Histogram" ]["minimum" ]
419+ metrics_list [disk_index + 9 ] = s ["value" ]["Histogram" ]["maximum" ]
420+ metrics_list [disk_index + 10 ] = s ["value" ]["Histogram" ]["mean" ]
421+ pipeline_metrics += [metrics_list ]
422+ last_processed = processed
423+ if stats ["global_metrics" ]["pipeline_complete" ]:
424+ break
380425 time .sleep (.1 )
381426 if os .isatty (1 ):
382427 print ()
383428 elapsed = "{:.1f}" .format (time .time () - start )
384429 print (f"Pipeline { pipeline_name } completed in { elapsed } s" )
385430
386- results += [[when , "feldera" , "stream" , "sql" , pipeline_name , cores , last_processed , elapsed ]]
431+ results += [[when , "feldera" , "stream" , "sql" , pipeline_name , cores , last_processed , elapsed , peak_memory ]]
387432
388433 # Start pipeline
389434 elapsed = stop_pipeline (pipeline_name , True )
390435 print (f"Stopped pipeline { pipeline_name } in { elapsed :.1f} s" )
391-
436+
392437 write_results (results , sys .stdout )
393438 if csvfile is not None :
394439 write_results (results , open (csvfile , 'w' , newline = '' ))
395-
440+ if csvmetricsfile is not None :
441+ write_metrics (pipeline_metrics , open (csvmetricsfile , 'w' , newline = '' ))
396442
397443if __name__ == "__main__" :
398444 main ()
0 commit comments