Skip to content

Commit 11efd85

Browse files
committed
[python-sdk] Add aggregtion to BenchmarkResult
Signed-off-by: Heorhii Bulakh <bulakh.96@gmail.com>
1 parent 6292a0d commit 11efd85

File tree

1 file changed

+157
-55
lines changed

1 file changed

+157
-55
lines changed

python/feldera/benchmarking.py

Lines changed: 157 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010

1111
import json
1212
import logging
13+
import math
1314
import os
1415
import time
15-
from dataclasses import dataclass
16+
from dataclasses import dataclass, field
1617
from datetime import datetime, timezone
1718
from enum import Enum
1819
from typing import TYPE_CHECKING, Optional
@@ -56,13 +57,56 @@ def _human_readable_bytes(n: int) -> str:
5657
return f"{value:.2f} {units[exp]}"
5758

5859

60+
def _stddev(values: list[float]) -> float:
61+
"""Population standard deviation."""
62+
n = len(values)
63+
if n < 2:
64+
return 0.0
65+
mean = sum(values) / n
66+
return math.sqrt(sum((x - mean) ** 2 for x in values) / n)
67+
68+
69+
def _averaged_metrics(runs: list["BenchmarkResult"]) -> "BenchmarkMetrics":
70+
"""Compute averaged BenchmarkMetrics from a list of runs.
71+
72+
Averages: throughput, uptime_ms, buffered_input_records_avg, state_amplification
73+
Min-of-mins: memory_bytes_min, storage_bytes_min, buffered_input_records_min
74+
Max-of-maxes: memory_bytes_max, storage_bytes_max, buffered_input_records_max
75+
"""
76+
n = len(runs)
77+
metrics_list = [r.metrics for r in runs]
78+
79+
sa_values = [
80+
m.state_amplification for m in metrics_list if m.state_amplification is not None
81+
]
82+
83+
return BenchmarkMetrics(
84+
throughput=int(sum(m.throughput for m in metrics_list) / n),
85+
memory_bytes_max=max(m.memory_bytes_max for m in metrics_list),
86+
memory_bytes_min=min(m.memory_bytes_min for m in metrics_list),
87+
storage_bytes_max=max(m.storage_bytes_max for m in metrics_list),
88+
storage_bytes_min=min(m.storage_bytes_min for m in metrics_list),
89+
uptime_ms=int(sum(m.uptime_ms for m in metrics_list) / n),
90+
buffered_input_records_avg=int(
91+
sum(m.buffered_input_records_avg for m in metrics_list) / n
92+
),
93+
buffered_input_records_min=min(
94+
m.buffered_input_records_min for m in metrics_list
95+
),
96+
buffered_input_records_max=max(
97+
m.buffered_input_records_max for m in metrics_list
98+
),
99+
state_amplification=sum(sa_values) / len(sa_values) if sa_values else None,
100+
)
101+
102+
59103
@dataclass
60104
class RawSample:
61105
"""One stats snapshot from ``pipeline.stats()``.
62106
63107
:param rss_bytes: Resident set size of the pipeline process in bytes.
64-
:param runtime_elapsed_msecs: Pipeline uptime in milliseconds at the time
65-
of the snapshot.
108+
:param uptime_msecs: Real-world elapsed milliseconds since the pipeline
109+
process started (wall-clock time, not aggregated CPU time).
66110
:param incarnation_uuid: UUID identifying the current pipeline incarnation.
67111
Changes if the pipeline restarts.
68112
:param storage_bytes: Bytes currently stored by the pipeline.
@@ -75,7 +119,7 @@ class RawSample:
75119
"""
76120

77121
rss_bytes: int
78-
runtime_elapsed_msecs: int
122+
uptime_msecs: int
79123
incarnation_uuid: str
80124
storage_bytes: int
81125
buffered_input_records: int
@@ -109,7 +153,7 @@ def from_pipeline_statistics(cls, stats) -> "RawSample":
109153

110154
return cls(
111155
rss_bytes=gm.rss_bytes or 0,
112-
runtime_elapsed_msecs=gm.runtime_elapsed_msecs or 0,
156+
uptime_msecs=int(gm.uptime_msecs or 0),
113157
incarnation_uuid=str(gm.incarnation_uuid),
114158
storage_bytes=gm.storage_bytes or 0,
115159
buffered_input_records=gm.buffered_input_records or 0,
@@ -123,21 +167,23 @@ def from_pipeline_statistics(cls, stats) -> "RawSample":
123167
class BenchmarkMetrics:
124168
"""Aggregated benchmark metrics derived from a list of :class:`RawSample`.
125169
126-
:param throughput: Records processed per second, computed from the last
127-
sample's ``total_processed_records`` and ``runtime_elapsed_msecs``.
170+
:param throughput: Records processed per second during the measurement
171+
window, computed as a delta between the first and last samples.
128172
:param memory_bytes_max: Peak RSS memory usage in bytes across all samples.
129173
:param memory_bytes_min: Minimum RSS memory usage in bytes across all samples.
130174
:param storage_bytes_max: Peak storage usage in bytes across all samples.
131175
:param storage_bytes_min: Minimum storage usage in bytes across all samples.
132-
:param uptime_ms: Pipeline uptime in milliseconds from the last sample.
176+
:param uptime_ms: Real-world elapsed milliseconds of the measurement window
177+
(delta of ``uptime_msecs`` between first and last sample).
133178
:param buffered_input_records_avg: Average buffered input record count across
134179
all samples (integer division).
135180
:param buffered_input_records_min: Minimum buffered input record count across
136181
all samples.
137182
:param buffered_input_records_max: Maximum buffered input record count across
138183
all samples.
139-
:param state_amplification: Ratio of peak storage bytes to total input bytes
140-
from the last sample. ``None`` when total input bytes is zero.
184+
:param state_amplification: Ratio of peak storage bytes to input bytes
185+
received during the measurement window. ``None`` when delta input
186+
bytes is zero.
141187
"""
142188

143189
throughput: int
@@ -165,10 +211,14 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
165211
"No measurements were recorded. Maybe try to increase `duration`."
166212
)
167213

214+
first = samples[0]
168215
last = samples[-1]
169216

170-
uptime_s = last.runtime_elapsed_msecs / 1000.0
171-
throughput = int(last.total_processed_records / uptime_s) if uptime_s > 0 else 0
217+
# Delta-based: measure only the records and time within the window
218+
delta_records = last.total_processed_records - first.total_processed_records
219+
delta_ms = last.uptime_msecs - first.uptime_msecs
220+
delta_s = delta_ms / 1000.0
221+
throughput = int(delta_records / delta_s) if delta_s > 0 else 0
172222

173223
memory_bytes_max = max(s.rss_bytes for s in samples)
174224
memory_bytes_min = min(s.rss_bytes for s in samples)
@@ -189,9 +239,9 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
189239
zero_count,
190240
)
191241

192-
input_bytes = last.input_bytes
242+
delta_input_bytes = last.input_bytes - first.input_bytes
193243
state_amplification = (
194-
storage_bytes_max / input_bytes if input_bytes > 0 else None
244+
storage_bytes_max / delta_input_bytes if delta_input_bytes > 0 else None
195245
)
196246

197247
return cls(
@@ -200,7 +250,7 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
200250
memory_bytes_min=memory_bytes_min,
201251
storage_bytes_max=storage_bytes_max,
202252
storage_bytes_min=storage_bytes_min,
203-
uptime_ms=last.runtime_elapsed_msecs,
253+
uptime_ms=delta_ms,
204254
buffered_input_records_avg=buffered_input_records_avg,
205255
buffered_input_records_min=buffered_input_records_min,
206256
buffered_input_records_max=buffered_input_records_max,
@@ -212,16 +262,48 @@ def from_samples(cls, samples: list) -> "BenchmarkMetrics":
212262
class BenchmarkResult:
213263
"""A named benchmark result with timing information.
214264
265+
For a single run, ``_metrics`` is set directly. For multi-run aggregation,
266+
``runs`` holds the individual results and ``metrics`` computes averages
267+
on the fly.
268+
215269
:param name: Benchmark name, used as the top-level key in BMF output.
216-
:param metrics: Aggregated performance metrics.
217270
:param start_time: UTC timestamp when metric collection began.
218271
:param end_time: UTC timestamp when metric collection ended.
272+
:param runs: List of individual run results (multi-run aggregation).
273+
:param _metrics: Aggregated performance metrics (single run).
219274
"""
220275

221276
name: str
222-
metrics: BenchmarkMetrics
223277
start_time: datetime
224278
end_time: datetime
279+
runs: list["BenchmarkResult"] | None = None
280+
_metrics: BenchmarkMetrics | None = field(default=None, repr=False)
281+
282+
@property
283+
def metrics(self) -> BenchmarkMetrics:
284+
"""Return metrics — stored for single run, computed for multi-run."""
285+
if self._metrics is not None:
286+
return self._metrics
287+
if self.runs:
288+
return _averaged_metrics(self.runs)
289+
raise ValueError("BenchmarkResult has no metrics and no runs")
290+
291+
@classmethod
292+
def aggregate(
293+
cls, results: list["BenchmarkResult"], name: str | None = None
294+
) -> "BenchmarkResult":
295+
"""Create an aggregated result from multiple runs.
296+
297+
:param results: Non-empty list of individual run results.
298+
:param name: Optional name override; defaults to the first result's name.
299+
:returns: A new :class:`BenchmarkResult` with ``runs`` set.
300+
"""
301+
return cls(
302+
name=name or results[0].name,
303+
start_time=min(r.start_time for r in results),
304+
end_time=max(r.end_time for r in results),
305+
runs=results,
306+
)
225307

226308
def to_bmf(self) -> dict:
227309
"""Return the result as a Bencher Metric Format (BMF) dict.
@@ -261,51 +343,67 @@ def to_json(self) -> str:
261343
def format_table(self) -> str:
262344
"""Return a human-readable tabular display of the benchmark results.
263345
346+
For multi-run results (``self.runs is not None``), value cells show
347+
``avg (stddev X.Y%)`` computed from per-run metrics.
348+
264349
:returns: A multi-line string containing an ASCII table with one row
265-
per metric showing its value, lower bound, and upper bound.
350+
per metric.
266351
"""
267352
m = self.metrics
268-
rows = [
269-
("Metric", "Value", "Lower", "Upper"),
270-
(
271-
"Throughput (records/s)",
272-
str(m.throughput),
273-
"-",
274-
"-",
275-
),
276-
(
277-
"Memory",
278-
_human_readable_bytes(m.memory_bytes_max),
279-
_human_readable_bytes(m.memory_bytes_min),
280-
"-",
281-
),
282-
(
283-
"Storage",
284-
_human_readable_bytes(m.storage_bytes_max),
285-
_human_readable_bytes(m.storage_bytes_min),
286-
"-",
287-
),
288-
(
289-
"Uptime [ms]",
290-
str(m.uptime_ms),
291-
"-",
292-
"-",
293-
),
353+
is_multi = self.runs is not None and len(self.runs) > 1
354+
355+
def _val_with_stddev(avg: float, values: list[float], fmt: str = ".0f") -> str:
356+
if not is_multi:
357+
return f"{avg:{fmt}}"
358+
sd = _stddev(values)
359+
pct = (sd / avg * 100) if avg != 0 else 0.0
360+
return f"{avg:{fmt}} (stddev {pct:.1f}%)"
361+
362+
def _bytes_with_stddev(avg: int, values: list[int]) -> str:
363+
if not is_multi:
364+
return _human_readable_bytes(avg)
365+
sd = _stddev([float(v) for v in values])
366+
pct = (sd / avg * 100) if avg != 0 else 0.0
367+
return f"{_human_readable_bytes(avg)} (stddev {pct:.1f}%)"
368+
369+
rows: list[tuple[str, str]] = [
370+
("Metric", "Value"),
371+
("Throughput (records/s)", _val_with_stddev(
372+
m.throughput,
373+
[r.metrics.throughput for r in self.runs] if is_multi else [],
374+
)),
375+
("Memory", _bytes_with_stddev(
376+
m.memory_bytes_max,
377+
[r.metrics.memory_bytes_max for r in self.runs] if is_multi else [],
378+
)),
379+
("Storage", _bytes_with_stddev(
380+
m.storage_bytes_max,
381+
[r.metrics.storage_bytes_max for r in self.runs] if is_multi else [],
382+
)),
383+
("Uptime [ms]", _val_with_stddev(
384+
m.uptime_ms,
385+
[r.metrics.uptime_ms for r in self.runs] if is_multi else [],
386+
)),
294387
]
295388
if m.state_amplification is not None:
296-
rows.append(
297-
(
298-
"State Amplification",
299-
f"{m.state_amplification:.2f}",
300-
"-",
301-
"-",
302-
)
303-
)
389+
rows.append(("State Amplification", _val_with_stddev(
390+
m.state_amplification,
391+
[
392+
r.metrics.state_amplification
393+
for r in self.runs
394+
if r.metrics.state_amplification is not None
395+
]
396+
if is_multi
397+
else [],
398+
fmt=".2f",
399+
)))
304400

305401
col_widths = [max(len(row[i]) for row in rows) for i in range(len(rows[0]))]
306402
sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
307403

308-
lines = ["Benchmark Results:", sep]
404+
n_runs = len(self.runs) if self.runs else 1
405+
header = f"Benchmark Results ({n_runs} run{'s' if n_runs != 1 else ''}):"
406+
lines = [header, sep]
309407
for i, row in enumerate(rows):
310408
line = (
311409
"| "
@@ -375,7 +473,11 @@ def collect_metrics(
375473
elif completion_condition == CompletionCondition.IDLE:
376474
now = time.monotonic()
377475
cur_processed = sample.total_processed_records
378-
if prev_processed is not None and cur_processed == prev_processed and cur_processed > 0:
476+
if (
477+
prev_processed is not None
478+
and cur_processed == prev_processed
479+
and cur_processed > 0
480+
):
379481
if idle_started_at is None:
380482
idle_started_at = now
381483
elif now - idle_started_at >= idle_interval_s:
@@ -435,9 +537,9 @@ def bench(
435537
metrics = BenchmarkMetrics.from_samples(samples)
436538
return BenchmarkResult(
437539
name=benchmark_name,
438-
metrics=metrics,
439540
start_time=start_time,
440541
end_time=end_time,
542+
_metrics=metrics,
441543
)
442544

443545

0 commit comments

Comments
 (0)