Skip to content

Commit ccf7f2f

Browse files
committed
Add Python SDK utilities for equivalent
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent c6214c7 commit ccf7f2f

File tree

4 files changed

+735
-0
lines changed

4 files changed

+735
-0
lines changed

python/docs/examples.rst

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,3 +467,191 @@ Specifying Data Sources / Sinks
467467

468468
To connect Feldera to various data sources or sinks, you can define them in the SQL code.
469469
Refer to the connector documentation at: https://docs.feldera.com/connectors/
470+
471+
Benchmarking Pipelines
472+
======================
473+
474+
The :mod:`feldera.benchmarking` module provides utilities to collect and upload
475+
benchmark metrics for Feldera pipelines. It polls :meth:`.Pipeline.stats` in a
476+
loop, aggregates the snapshots into :class:`.BenchmarkMetrics`, and can
477+
optionally upload a
478+
`Bencher Metric Format (BMF) <https://bencher.dev/docs/reference/test-harnesses/>`_
479+
report to a Bencher-compatible server.
480+
481+
.. note::
482+
These utilities only **observe** a running pipeline — they do not start,
483+
stop, or otherwise manage pipeline lifetime. The caller is responsible for
484+
starting the pipeline before calling :func:`.bench` or
485+
:func:`.collect_metrics`, and for stopping it afterwards.
486+
487+
.. list-table:: Python equivalents of ``fda bench`` flags
488+
:header-rows: 1
489+
:widths: 40 60
490+
491+
* - ``fda`` flag
492+
- Python equivalent
493+
* - ``fda bench <name>``
494+
- :func:`.bench` (collects until ``pipeline_complete``)
495+
* - ``--duration <secs>``
496+
- ``bench(pipeline, duration_secs=<secs>)``
497+
* - ``--upload``
498+
- :func:`.upload_to_bencher`
499+
* - ``--branch <name>``
500+
- ``upload_to_bencher(..., branch=<name>)``
501+
* - ``--start-point <branch>``
502+
- ``upload_to_bencher(..., start_point=<branch>)``
503+
* - ``--start-point-clone-thresholds``
504+
- ``upload_to_bencher(..., start_point_clone_thresholds=True)``
505+
506+
Collect and Display Metrics
507+
---------------------------
508+
509+
This is the Python equivalent of ``fda bench <pipeline>``. Stop any existing
510+
run, start fresh, wait for all bounded input to be processed, then print a
511+
human-readable results table.
512+
513+
.. code-block:: python
514+
515+
from feldera import FelderaClient, PipelineBuilder, bench
516+
517+
client = FelderaClient("http://localhost:8080")
518+
519+
sql = """
520+
CREATE TABLE events (id INT, value DOUBLE) WITH (
521+
'connectors' = '[{
522+
"transport": {
523+
"name": "datagen",
524+
"config": {"plan": [{"limit": 1000000, "rate": 100000}]}
525+
}
526+
}]'
527+
);
528+
CREATE MATERIALIZED VIEW totals AS
529+
SELECT COUNT(*) AS n, SUM(value) AS total FROM events;
530+
"""
531+
532+
pipeline = PipelineBuilder(client, name="my_bench", sql=sql).create_or_replace()
533+
534+
# Stop any running instance for a reproducible baseline, then start fresh.
535+
pipeline.stop()
536+
pipeline.start()
537+
538+
# Poll stats until pipeline_complete is True (all bounded input consumed).
539+
result = bench(pipeline)
540+
541+
# Stop the pipeline now that collection is done.
542+
pipeline.stop()
543+
544+
# Print the results table.
545+
print(result.format_table())
546+
547+
# Or access the BMF dict directly.
548+
print(result.to_json())
549+
550+
Collect Metrics for a Fixed Duration
551+
-------------------------------------
552+
553+
For streaming pipelines whose input never ends naturally, pass
554+
``duration_secs`` to stop collection after a fixed wall-clock window.
555+
This is the Python equivalent of ``fda bench --duration <secs>``.
556+
557+
.. code-block:: python
558+
559+
from feldera import FelderaClient, bench
560+
561+
client = FelderaClient("http://localhost:8080")
562+
pipeline = client.get_pipeline("my_streaming_pipeline")
563+
564+
pipeline.stop()
565+
pipeline.start()
566+
567+
# Collect for 60 seconds regardless of pipeline_complete.
568+
result = bench(pipeline, duration_secs=60)
569+
570+
pipeline.stop()
571+
572+
print(result.format_table())
573+
574+
Upload Results to Bencher
575+
--------------------------
576+
577+
This is the Python equivalent of ``fda bench --upload``. After collecting
578+
metrics, call :func:`.upload_to_bencher` to POST the BMF report to a
579+
Bencher-compatible server.
580+
581+
Passing ``feldera_client`` enriches the run context with the Feldera instance
582+
edition and revision (the same information ``fda`` derives automatically).
583+
584+
API token and project can be supplied as parameters or via the
585+
``BENCHER_API_TOKEN`` and ``BENCHER_PROJECT`` environment variables.
586+
587+
.. code-block:: python
588+
589+
from feldera import FelderaClient, PipelineBuilder, bench, upload_to_bencher
590+
591+
client = FelderaClient("http://localhost:8080")
592+
593+
sql = """
594+
CREATE TABLE events (id INT, value DOUBLE) WITH (
595+
'connectors' = '[{
596+
"transport": {
597+
"name": "datagen",
598+
"config": {"plan": [{"limit": 1000000, "rate": 100000}]}
599+
}
600+
}]'
601+
);
602+
CREATE MATERIALIZED VIEW totals AS
603+
SELECT COUNT(*) AS n, SUM(value) AS total FROM events;
604+
"""
605+
606+
pipeline = PipelineBuilder(client, name="my_bench", sql=sql).create_or_replace()
607+
608+
pipeline.stop()
609+
pipeline.start()
610+
result = bench(pipeline)
611+
pipeline.stop()
612+
613+
print(result.format_table())
614+
615+
# Upload to https://benchmarks.feldera.io (the default host).
616+
upload_to_bencher(
617+
result,
618+
project="my-project", # or set BENCHER_PROJECT env var
619+
token="YOUR_BENCHER_TOKEN", # or set BENCHER_API_TOKEN env var
620+
branch="main",
621+
feldera_client=client, # adds edition/revision to run context
622+
)
623+
624+
.. note::
625+
The ``host`` parameter (or ``BENCHER_HOST`` environment variable) can point
626+
to any Bencher-compatible server. It defaults to
627+
``https://benchmarks.feldera.io``.
628+
629+
Track Results Against a Baseline Branch
630+
-----------------------------------------
631+
632+
Use ``start_point`` to initialise a new branch from an existing one and
633+
optionally inherit its alert thresholds. This mirrors
634+
``fda bench --upload --start-point <branch> --start-point-clone-thresholds``.
635+
636+
.. code-block:: python
637+
638+
from feldera import FelderaClient, bench, upload_to_bencher
639+
640+
client = FelderaClient("http://localhost:8080")
641+
pipeline = client.get_pipeline("my_bench")
642+
643+
pipeline.stop()
644+
pipeline.start()
645+
result = bench(pipeline)
646+
pipeline.stop()
647+
648+
upload_to_bencher(
649+
result,
650+
project="my-project",
651+
token="YOUR_BENCHER_TOKEN",
652+
branch="feature/my-optimisation", # the branch being tested
653+
start_point="main", # branch to branch off from
654+
start_point_clone_thresholds=True, # inherit alert thresholds
655+
start_point_max_versions=10, # how many historical runs to consider
656+
feldera_client=client,
657+
)

python/docs/feldera.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ feldera.runtime\_config module
4141
:undoc-members:
4242
:show-inheritance:
4343

44+
feldera.benchmarking module
45+
---------------------------
46+
47+
.. automodule:: feldera.benchmarking
48+
:members:
49+
:undoc-members:
50+
:show-inheritance:
51+
4452
Subpackages
4553
-----------
4654

python/feldera/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
from feldera.pipeline import Pipeline as Pipeline
33
from feldera.pipeline_builder import PipelineBuilder as PipelineBuilder
44
from feldera.rest._helpers import determine_client_version
5+
from feldera.benchmarking import (
6+
BenchmarkResult as BenchmarkResult,
7+
BenchmarkMetrics as BenchmarkMetrics,
8+
collect_metrics as collect_metrics,
9+
bench as bench,
10+
upload_to_bencher as upload_to_bencher,
11+
)
512

613
__version__ = determine_client_version()
714

0 commit comments

Comments
 (0)