1010
1111import json
1212import logging
13+ import math
1314import os
1415import time
15- from dataclasses import dataclass
16+ from dataclasses import dataclass , field
1617from datetime import datetime , timezone
1718from enum import Enum
1819from typing import TYPE_CHECKING , Optional
@@ -56,13 +57,56 @@ def _human_readable_bytes(n: int) -> str:
5657 return f"{ value :.2f} { units [exp ]} "
5758
5859
60+ def _stddev (values : list [float ]) -> float :
61+ """Population standard deviation."""
62+ n = len (values )
63+ if n < 2 :
64+ return 0.0
65+ mean = sum (values ) / n
66+ return math .sqrt (sum ((x - mean ) ** 2 for x in values ) / n )
67+
68+
69+ def _averaged_metrics (runs : list ["BenchmarkResult" ]) -> "BenchmarkMetrics" :
70+ """Compute averaged BenchmarkMetrics from a list of runs.
71+
72+ Averages: throughput, uptime_ms, buffered_input_records_avg, state_amplification
73+ Min-of-mins: memory_bytes_min, storage_bytes_min, buffered_input_records_min
74+ Max-of-maxes: memory_bytes_max, storage_bytes_max, buffered_input_records_max
75+ """
76+ n = len (runs )
77+ metrics_list = [r .metrics for r in runs ]
78+
79+ sa_values = [
80+ m .state_amplification for m in metrics_list if m .state_amplification is not None
81+ ]
82+
83+ return BenchmarkMetrics (
84+ throughput = int (sum (m .throughput for m in metrics_list ) / n ),
85+ memory_bytes_max = max (m .memory_bytes_max for m in metrics_list ),
86+ memory_bytes_min = min (m .memory_bytes_min for m in metrics_list ),
87+ storage_bytes_max = max (m .storage_bytes_max for m in metrics_list ),
88+ storage_bytes_min = min (m .storage_bytes_min for m in metrics_list ),
89+ uptime_ms = int (sum (m .uptime_ms for m in metrics_list ) / n ),
90+ buffered_input_records_avg = int (
91+ sum (m .buffered_input_records_avg for m in metrics_list ) / n
92+ ),
93+ buffered_input_records_min = min (
94+ m .buffered_input_records_min for m in metrics_list
95+ ),
96+ buffered_input_records_max = max (
97+ m .buffered_input_records_max for m in metrics_list
98+ ),
99+ state_amplification = sum (sa_values ) / len (sa_values ) if sa_values else None ,
100+ )
101+
102+
59103@dataclass
60104class RawSample :
61105 """One stats snapshot from ``pipeline.stats()``.
62106
63107 :param rss_bytes: Resident set size of the pipeline process in bytes.
64- :param runtime_elapsed_msecs: Pipeline uptime in milliseconds at the time
65- of the snapshot .
108+ :param uptime_msecs: Real-world elapsed milliseconds since the pipeline
109+ process started (wall-clock time, not aggregated CPU time) .
66110 :param incarnation_uuid: UUID identifying the current pipeline incarnation.
67111 Changes if the pipeline restarts.
68112 :param storage_bytes: Bytes currently stored by the pipeline.
@@ -75,7 +119,7 @@ class RawSample:
75119 """
76120
77121 rss_bytes : int
78- runtime_elapsed_msecs : int
122+ uptime_msecs : int
79123 incarnation_uuid : str
80124 storage_bytes : int
81125 buffered_input_records : int
@@ -109,7 +153,7 @@ def from_pipeline_statistics(cls, stats) -> "RawSample":
109153
110154 return cls (
111155 rss_bytes = gm .rss_bytes or 0 ,
112- runtime_elapsed_msecs = gm .runtime_elapsed_msecs or 0 ,
156+ uptime_msecs = int ( gm .uptime_msecs or 0 ) ,
113157 incarnation_uuid = str (gm .incarnation_uuid ),
114158 storage_bytes = gm .storage_bytes or 0 ,
115159 buffered_input_records = gm .buffered_input_records or 0 ,
@@ -123,21 +167,23 @@ def from_pipeline_statistics(cls, stats) -> "RawSample":
123167class BenchmarkMetrics :
124168 """Aggregated benchmark metrics derived from a list of :class:`RawSample`.
125169
126- :param throughput: Records processed per second, computed from the last
127- sample's ``total_processed_records`` and ``runtime_elapsed_msecs`` .
170+ :param throughput: Records processed per second during the measurement
171+ window, computed as a delta between the first and last samples .
128172 :param memory_bytes_max: Peak RSS memory usage in bytes across all samples.
129173 :param memory_bytes_min: Minimum RSS memory usage in bytes across all samples.
130174 :param storage_bytes_max: Peak storage usage in bytes across all samples.
131175 :param storage_bytes_min: Minimum storage usage in bytes across all samples.
132- :param uptime_ms: Pipeline uptime in milliseconds from the last sample.
176+ :param uptime_ms: Real-world elapsed milliseconds of the measurement window
177+ (delta of ``uptime_msecs`` between first and last sample).
133178 :param buffered_input_records_avg: Average buffered input record count across
134179 all samples (integer division).
135180 :param buffered_input_records_min: Minimum buffered input record count across
136181 all samples.
137182 :param buffered_input_records_max: Maximum buffered input record count across
138183 all samples.
139- :param state_amplification: Ratio of peak storage bytes to total input bytes
140- from the last sample. ``None`` when total input bytes is zero.
184+ :param state_amplification: Ratio of peak storage bytes to input bytes
185+ received during the measurement window. ``None`` when delta input
186+ bytes is zero.
141187 """
142188
143189 throughput : int
@@ -165,10 +211,14 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
165211 "No measurements were recorded. Maybe try to increase `duration`."
166212 )
167213
214+ first = samples [0 ]
168215 last = samples [- 1 ]
169216
170- uptime_s = last .runtime_elapsed_msecs / 1000.0
171- throughput = int (last .total_processed_records / uptime_s ) if uptime_s > 0 else 0
217+ # Delta-based: measure only the records and time within the window
218+ delta_records = last .total_processed_records - first .total_processed_records
219+ delta_ms = last .uptime_msecs - first .uptime_msecs
220+ delta_s = delta_ms / 1000.0
221+ throughput = int (delta_records / delta_s ) if delta_s > 0 else 0
172222
173223 memory_bytes_max = max (s .rss_bytes for s in samples )
174224 memory_bytes_min = min (s .rss_bytes for s in samples )
@@ -189,9 +239,9 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
189239 zero_count ,
190240 )
191241
192- input_bytes = last .input_bytes
242+ delta_input_bytes = last . input_bytes - first .input_bytes
193243 state_amplification = (
194- storage_bytes_max / input_bytes if input_bytes > 0 else None
244+ storage_bytes_max / delta_input_bytes if delta_input_bytes > 0 else None
195245 )
196246
197247 return cls (
@@ -200,7 +250,7 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
200250 memory_bytes_min = memory_bytes_min ,
201251 storage_bytes_max = storage_bytes_max ,
202252 storage_bytes_min = storage_bytes_min ,
203- uptime_ms = last . runtime_elapsed_msecs ,
253+ uptime_ms = delta_ms ,
204254 buffered_input_records_avg = buffered_input_records_avg ,
205255 buffered_input_records_min = buffered_input_records_min ,
206256 buffered_input_records_max = buffered_input_records_max ,
@@ -212,16 +262,48 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
212262class BenchmarkResult :
213263 """A named benchmark result with timing information.
214264
265+ For a single run, ``_metrics`` is set directly. For multi-run aggregation,
266+ ``runs`` holds the individual results and ``metrics`` computes averages
267+ on the fly.
268+
215269 :param name: Benchmark name, used as the top-level key in BMF output.
216- :param metrics: Aggregated performance metrics.
217270 :param start_time: UTC timestamp when metric collection began.
218271 :param end_time: UTC timestamp when metric collection ended.
272+ :param runs: List of individual run results (multi-run aggregation).
273+ :param _metrics: Aggregated performance metrics (single run).
219274 """
220275
221276 name : str
222- metrics : BenchmarkMetrics
223277 start_time : datetime
224278 end_time : datetime
279+ runs : list ["BenchmarkResult" ] | None = None
280+ _metrics : BenchmarkMetrics | None = field (default = None , repr = False )
281+
282+ @property
283+ def metrics (self ) -> BenchmarkMetrics :
284+ """Return metrics — stored for single run, computed for multi-run."""
285+ if self ._metrics is not None :
286+ return self ._metrics
287+ if self .runs :
288+ return _averaged_metrics (self .runs )
289+ raise ValueError ("BenchmarkResult has no metrics and no runs" )
290+
291+ @classmethod
292+ def aggregate (
293+ cls , results : list ["BenchmarkResult" ], name : str | None = None
294+ ) -> "BenchmarkResult" :
295+ """Create an aggregated result from multiple runs.
296+
297+ :param results: Non-empty list of individual run results.
298+ :param name: Optional name override; defaults to the first result's name.
299+ :returns: A new :class:`BenchmarkResult` with ``runs`` set.
300+ """
301+ return cls (
302+ name = name or results [0 ].name ,
303+ start_time = min (r .start_time for r in results ),
304+ end_time = max (r .end_time for r in results ),
305+ runs = results ,
306+ )
225307
226308 def to_bmf (self ) -> dict :
227309 """Return the result as a Bencher Metric Format (BMF) dict.
@@ -261,51 +343,67 @@ def to_json(self) -> str:
261343 def format_table (self ) -> str :
262344 """Return a human-readable tabular display of the benchmark results.
263345
346+ For multi-run results (``self.runs is not None``), value cells show
347+ ``avg (stddev X.Y%)`` computed from per-run metrics.
348+
264349 :returns: A multi-line string containing an ASCII table with one row
265- per metric showing its value, lower bound, and upper bound .
350+ per metric.
266351 """
267352 m = self .metrics
268- rows = [
269- ("Metric" , "Value" , "Lower" , "Upper" ),
270- (
271- "Throughput (records/s)" ,
272- str (m .throughput ),
273- "-" ,
274- "-" ,
275- ),
276- (
277- "Memory" ,
278- _human_readable_bytes (m .memory_bytes_max ),
279- _human_readable_bytes (m .memory_bytes_min ),
280- "-" ,
281- ),
282- (
283- "Storage" ,
284- _human_readable_bytes (m .storage_bytes_max ),
285- _human_readable_bytes (m .storage_bytes_min ),
286- "-" ,
287- ),
288- (
289- "Uptime [ms]" ,
290- str (m .uptime_ms ),
291- "-" ,
292- "-" ,
293- ),
353+ is_multi = self .runs is not None and len (self .runs ) > 1
354+
355+ def _val_with_stddev (avg : float , values : list [float ], fmt : str = ".0f" ) -> str :
356+ if not is_multi :
357+ return f"{ avg :{fmt }} "
358+ sd = _stddev (values )
359+ pct = (sd / avg * 100 ) if avg != 0 else 0.0
360+ return f"{ avg :{fmt }} (stddev { pct :.1f} %)"
361+
362+ def _bytes_with_stddev (avg : int , values : list [int ]) -> str :
363+ if not is_multi :
364+ return _human_readable_bytes (avg )
365+ sd = _stddev ([float (v ) for v in values ])
366+ pct = (sd / avg * 100 ) if avg != 0 else 0.0
367+ return f"{ _human_readable_bytes (avg )} (stddev { pct :.1f} %)"
368+
369+ rows : list [tuple [str , str ]] = [
370+ ("Metric" , "Value" ),
371+ ("Throughput (records/s)" , _val_with_stddev (
372+ m .throughput ,
373+ [r .metrics .throughput for r in self .runs ] if is_multi else [],
374+ )),
375+ ("Memory" , _bytes_with_stddev (
376+ m .memory_bytes_max ,
377+ [r .metrics .memory_bytes_max for r in self .runs ] if is_multi else [],
378+ )),
379+ ("Storage" , _bytes_with_stddev (
380+ m .storage_bytes_max ,
381+ [r .metrics .storage_bytes_max for r in self .runs ] if is_multi else [],
382+ )),
383+ ("Uptime [ms]" , _val_with_stddev (
384+ m .uptime_ms ,
385+ [r .metrics .uptime_ms for r in self .runs ] if is_multi else [],
386+ )),
294387 ]
295388 if m .state_amplification is not None :
296- rows .append (
297- (
298- "State Amplification" ,
299- f"{ m .state_amplification :.2f} " ,
300- "-" ,
301- "-" ,
302- )
303- )
389+ rows .append (("State Amplification" , _val_with_stddev (
390+ m .state_amplification ,
391+ [
392+ r .metrics .state_amplification
393+ for r in self .runs
394+ if r .metrics .state_amplification is not None
395+ ]
396+ if is_multi
397+ else [],
398+ fmt = ".2f" ,
399+ )))
304400
305401 col_widths = [max (len (row [i ]) for row in rows ) for i in range (len (rows [0 ]))]
306402 sep = "+-" + "-+-" .join ("-" * w for w in col_widths ) + "-+"
307403
308- lines = ["Benchmark Results:" , sep ]
404+ n_runs = len (self .runs ) if self .runs else 1
405+ header = f"Benchmark Results ({ n_runs } run{ 's' if n_runs != 1 else '' } ):"
406+ lines = [header , sep ]
309407 for i , row in enumerate (rows ):
310408 line = (
311409 "| "
@@ -375,7 +473,11 @@ def collect_metrics(
375473 elif completion_condition == CompletionCondition .IDLE :
376474 now = time .monotonic ()
377475 cur_processed = sample .total_processed_records
378- if prev_processed is not None and cur_processed == prev_processed and cur_processed > 0 :
476+ if (
477+ prev_processed is not None
478+ and cur_processed == prev_processed
479+ and cur_processed > 0
480+ ):
379481 if idle_started_at is None :
380482 idle_started_at = now
381483 elif now - idle_started_at >= idle_interval_s :
@@ -435,9 +537,9 @@ def bench(
435537 metrics = BenchmarkMetrics .from_samples (samples )
436538 return BenchmarkResult (
437539 name = benchmark_name ,
438- metrics = metrics ,
439540 start_time = start_time ,
440541 end_time = end_time ,
542+ _metrics = metrics ,
441543 )
442544
443545
0 commit comments