Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[python-sdk] Remove references to 'fda' in benchmarking docs, add run…
…time_revision to Python SDK

Signed-off-by: Heorhii Bulakh <bulakh.96@gmail.com>
  • Loading branch information
Karakatiza666 committed Mar 9, 2026
commit d59b4f71f38e0308462cc55dd4c5dcdae0ad7544
63 changes: 34 additions & 29 deletions python/docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,31 +484,11 @@ report to a Bencher-compatible server.
starting the pipeline before calling :func:`.bench` or
:func:`.collect_metrics`, and for stopping it afterwards.

.. list-table:: Python equivalents of ``fda bench`` flags
:header-rows: 1
:widths: 40 60

* - ``fda`` flag
- Python equivalent
* - ``fda bench <name>``
- :func:`.bench` (collects until ``pipeline_complete``)
* - ``--duration <secs>``
- ``bench(pipeline, duration_secs=<secs>)``
* - ``--upload``
- :func:`.upload_to_bencher`
* - ``--branch <name>``
- ``upload_to_bencher(..., branch=<name>)``
* - ``--start-point <branch>``
- ``upload_to_bencher(..., start_point=<branch>)``
* - ``--start-point-clone-thresholds``
- ``upload_to_bencher(..., start_point_clone_thresholds=True)``

Collect and Display Metrics
---------------------------

This is the Python equivalent of ``fda bench <pipeline>``. Stop any existing
run, start fresh, wait for all bounded input to be processed, then print a
human-readable results table.
Stop any existing run, start fresh, wait for all bounded input to be processed,
then print a human-readable results table.

.. code-block:: python

Expand Down Expand Up @@ -552,7 +532,6 @@ Collect Metrics for a Fixed Duration

For streaming pipelines whose input never ends naturally, pass
``duration_secs`` to stop collection after a fixed wall-clock window.
This is the Python equivalent of ``fda bench --duration <secs>``.

.. code-block:: python

Expand All @@ -571,15 +550,42 @@ This is the Python equivalent of ``fda bench --duration <secs>``.

print(result.format_table())

Aggregate Metrics Across Multiple Runs
---------------------------------------

Run the benchmark several times and combine the results with
:meth:`.BenchmarkResult.aggregate`. The aggregated result averages throughput,
uptime, and state-amplification across runs, takes the min-of-mins and
max-of-maxes for memory and storage, and can be passed directly to
:func:`.upload_to_bencher` just like a single-run result.

.. code-block:: python

from feldera import FelderaClient, bench
from feldera.benchmarking import BenchmarkResult

client = FelderaClient("http://localhost:8080")
pipeline = client.get_pipeline("my_bench")

runs = []
for _ in range(3):
pipeline.stop()
pipeline.start()
runs.append(bench(pipeline))

pipeline.stop()

result = BenchmarkResult.aggregate(runs)
print(result.format_table()) # shows avg with stddev %

Upload Results to Bencher
--------------------------

This is the Python equivalent of ``fda bench --upload``. After collecting
metrics, call :func:`.upload_to_bencher` to POST the BMF report to a
Bencher-compatible server.
After collecting metrics, call :func:`.upload_to_bencher` to POST the BMF
report to a Bencher-compatible server.

Passing ``feldera_client`` enriches the run context with the Feldera instance
edition and revision (the same information ``fda`` derives automatically).
edition and revision.

API token and project can be supplied as parameters or via the
``BENCHER_API_TOKEN`` and ``BENCHER_PROJECT`` environment variables.
Expand Down Expand Up @@ -630,8 +636,7 @@ Track Results Against a Baseline Branch
-----------------------------------------

Use ``start_point`` to initialise a new branch from an existing one and
optionally inherit its alert thresholds. This mirrors
``fda bench --upload --start-point <branch> --start-point-clone-thresholds``.
optionally inherit its alert thresholds.

.. code-block:: python

Expand Down
86 changes: 59 additions & 27 deletions python/feldera/benchmarking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

This mirrors the `fda bench --upload` CLI functionality so Python-based benchmark
workloads can collect and upload results programmatically.

**Differences from** ``fda bench``: throughput, uptime, and state-amplification
are computed as deltas between the first and last collected sample (i.e. over the
observation window), whereas ``fda bench`` uses absolute values since pipeline
start. This means the two tools will report different numbers for the same run,
but the Python SDK approach is more appropriate when benchmarking a pipeline that
may have been running before collection started.
"""

import json
Expand Down Expand Up @@ -368,35 +375,54 @@ def _bytes_with_stddev(avg: int, values: list[int]) -> str:

rows: list[tuple[str, str]] = [
("Metric", "Value"),
("Throughput (records/s)", _val_with_stddev(
m.throughput,
[r.metrics.throughput for r in self.runs] if is_multi else [],
)),
("Memory", _bytes_with_stddev(
m.memory_bytes_max,
[r.metrics.memory_bytes_max for r in self.runs] if is_multi else [],
)),
("Storage", _bytes_with_stddev(
m.storage_bytes_max,
[r.metrics.storage_bytes_max for r in self.runs] if is_multi else [],
)),
("Uptime [ms]", _val_with_stddev(
m.uptime_ms,
[r.metrics.uptime_ms for r in self.runs] if is_multi else [],
)),
(
"Throughput (records/s)",
_val_with_stddev(
m.throughput,
[r.metrics.throughput for r in self.runs] if is_multi else [],
),
),
(
"Memory",
_bytes_with_stddev(
m.memory_bytes_max,
[r.metrics.memory_bytes_max for r in self.runs] if is_multi else [],
),
),
(
"Storage",
_bytes_with_stddev(
m.storage_bytes_max,
[r.metrics.storage_bytes_max for r in self.runs]
if is_multi
else [],
),
),
(
"Uptime [ms]",
_val_with_stddev(
m.uptime_ms,
[r.metrics.uptime_ms for r in self.runs] if is_multi else [],
),
),
]
if m.state_amplification is not None:
rows.append(("State Amplification", _val_with_stddev(
m.state_amplification,
[
r.metrics.state_amplification
for r in self.runs
if r.metrics.state_amplification is not None
]
if is_multi
else [],
fmt=".2f",
)))
rows.append(
(
"State Amplification",
_val_with_stddev(
m.state_amplification,
[
r.metrics.state_amplification
for r in self.runs
if r.metrics.state_amplification is not None
]
if is_multi
else [],
fmt=".2f",
),
)
)

col_widths = [max(len(row[i]) for row in rows) for i in range(len(rows[0]))]
sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
Expand Down Expand Up @@ -631,6 +657,12 @@ def upload_to_bencher(
logger.warning(
"Unknown Feldera edition '%s'; not setting repo hash.", edition
)

# Auto-resolve git_hash from runtime_revision if not provided by caller,
# mirroring the fda bench logic (runtime_version takes precedence, then
# runtime_revision).
if git_hash is None and config.runtime_revision:
git_hash = config.runtime_revision
except Exception as exc:
logger.warning("Failed to fetch Feldera instance config: %s", exc)

Expand Down
1 change: 1 addition & 0 deletions python/feldera/rest/feldera_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, cfg: dict):
self.edition = FelderaEdition.from_value(cfg.get("edition"))
self.license_validity = cfg.get("license_validity")
self.revision = cfg.get("revision")
self.runtime_revision = cfg.get("runtime_revision", "")
self.telemetry = cfg.get("telemetry")
self.update_info = cfg.get("update_info")
self.version = cfg.get("version")
Loading