@@ -467,3 +467,191 @@ Specifying Data Sources / Sinks
467467
468468To connect Feldera to various data sources or sinks, you can define them in the SQL code.
469469Refer to the connector documentation at: https://docs.feldera.com/connectors/
470+
471+ Benchmarking Pipelines
472+ ======================
473+
474+ The :mod: `feldera.benchmarking ` module provides utilities to collect and upload
475+ benchmark metrics for Feldera pipelines. It polls :meth: `.Pipeline.stats ` in a
476+ loop, aggregates the snapshots into :class: `.BenchmarkMetrics `, and can
477+ optionally upload a
478+ `Bencher Metric Format (BMF) <https://bencher.dev/docs/reference/test-harnesses/ >`_
479+ report to a Bencher-compatible server.
480+
481+ .. note ::
482+ These utilities only **observe ** a running pipeline — they do not start,
483+ stop, or otherwise manage pipeline lifetime. The caller is responsible for
484+ starting the pipeline before calling :func: `.bench ` or
485+ :func: `.collect_metrics `, and for stopping it afterwards.
486+
487+ .. list-table :: Python equivalents of ``fda bench`` flags
488+ :header-rows: 1
489+ :widths: 40 60
490+
491+ * - ``fda `` flag
492+ - Python equivalent
493+ * - ``fda bench <name> ``
494+ - :func: `.bench ` (collects until ``pipeline_complete ``)
495+ * - ``--duration <secs> ``
496+ - ``bench(pipeline, duration_secs=<secs>) ``
497+ * - ``--upload ``
498+ - :func: `.upload_to_bencher `
499+ * - ``--branch <name> ``
500+ - ``upload_to_bencher(..., branch=<name>) ``
501+ * - ``--start-point <branch> ``
502+ - ``upload_to_bencher(..., start_point=<branch>) ``
503+ * - ``--start-point-clone-thresholds ``
504+ - ``upload_to_bencher(..., start_point_clone_thresholds=True) ``
505+
506+ Collect and Display Metrics
507+ ---------------------------
508+
509+ This is the Python equivalent of ``fda bench <pipeline> ``. Stop any existing
510+ run, start fresh, wait for all bounded input to be processed, then print a
511+ human-readable results table.
512+
513+ .. code-block :: python
514+
515+ from feldera import FelderaClient, PipelineBuilder, bench
516+
517+ client = FelderaClient(" http://localhost:8080" )
518+
519+ sql = """
520+ CREATE TABLE events (id INT, value DOUBLE) WITH (
521+ 'connectors' = '[{
522+ "transport": {
523+ "name": "datagen",
524+ "config": {"plan": [{"limit": 1000000, "rate": 100000}]}
525+ }
526+ }]'
527+ );
528+ CREATE MATERIALIZED VIEW totals AS
529+ SELECT COUNT(*) AS n, SUM(value) AS total FROM events;
530+ """
531+
532+ pipeline = PipelineBuilder(client, name = " my_bench" , sql = sql).create_or_replace()
533+
534+ # Stop any running instance for a reproducible baseline, then start fresh.
535+ pipeline.stop()
536+ pipeline.start()
537+
538+ # Poll stats until pipeline_complete is True (all bounded input consumed).
539+ result = bench(pipeline)
540+
541+ # Stop the pipeline now that collection is done.
542+ pipeline.stop()
543+
544+ # Print the results table.
545+ print (result.format_table())
546+
547+ # Or access the BMF dict directly.
548+ print (result.to_json())
549+
550+ Collect Metrics for a Fixed Duration
551+ -------------------------------------
552+
553+ For streaming pipelines whose input never ends naturally, pass
554+ ``duration_secs `` to stop collection after a fixed wall-clock window.
555+ This is the Python equivalent of ``fda bench --duration <secs> ``.
556+
557+ .. code-block :: python
558+
559+ from feldera import FelderaClient, bench
560+
561+ client = FelderaClient(" http://localhost:8080" )
562+ pipeline = client.get_pipeline(" my_streaming_pipeline" )
563+
564+ pipeline.stop()
565+ pipeline.start()
566+
567+ # Collect for 60 seconds regardless of pipeline_complete.
568+ result = bench(pipeline, duration_secs = 60 )
569+
570+ pipeline.stop()
571+
572+ print (result.format_table())
573+
574+ Upload Results to Bencher
575+ --------------------------
576+
577+ This is the Python equivalent of ``fda bench --upload ``. After collecting
578+ metrics, call :func: `.upload_to_bencher ` to POST the BMF report to a
579+ Bencher-compatible server.
580+
581+ Passing ``feldera_client `` enriches the run context with the Feldera instance
582+ edition and revision (the same information ``fda `` derives automatically).
583+
584+ API token and project can be supplied as parameters or via the
585+ ``BENCHER_API_TOKEN `` and ``BENCHER_PROJECT `` environment variables.
586+
587+ .. code-block :: python
588+
589+ from feldera import FelderaClient, PipelineBuilder, bench, upload_to_bencher
590+
591+ client = FelderaClient(" http://localhost:8080" )
592+
593+ sql = """
594+ CREATE TABLE events (id INT, value DOUBLE) WITH (
595+ 'connectors' = '[{
596+ "transport": {
597+ "name": "datagen",
598+ "config": {"plan": [{"limit": 1000000, "rate": 100000}]}
599+ }
600+ }]'
601+ );
602+ CREATE MATERIALIZED VIEW totals AS
603+ SELECT COUNT(*) AS n, SUM(value) AS total FROM events;
604+ """
605+
606+ pipeline = PipelineBuilder(client, name = " my_bench" , sql = sql).create_or_replace()
607+
608+ pipeline.stop()
609+ pipeline.start()
610+ result = bench(pipeline)
611+ pipeline.stop()
612+
613+ print (result.format_table())
614+
615+ # Upload to https://benchmarks.feldera.io (the default host).
616+ upload_to_bencher(
617+ result,
618+ project = " my-project" , # or set BENCHER_PROJECT env var
619+ token = " YOUR_BENCHER_TOKEN" , # or set BENCHER_API_TOKEN env var
620+ branch = " main" ,
621+ feldera_client = client, # adds edition/revision to run context
622+ )
623+
624+ .. note ::
625+ The ``host `` parameter (or ``BENCHER_HOST `` environment variable) can point
626+ to any Bencher-compatible server. It defaults to
627+ ``https://benchmarks.feldera.io ``.
628+
629+ Track Results Against a Baseline Branch
630+ -----------------------------------------
631+
632+ Use ``start_point `` to initialise a new branch from an existing one and
633+ optionally inherit its alert thresholds. This mirrors
634+ ``fda bench --upload --start-point <branch> --start-point-clone-thresholds ``.
635+
636+ .. code-block :: python
637+
638+ from feldera import FelderaClient, bench, upload_to_bencher
639+
640+ client = FelderaClient(" http://localhost:8080" )
641+ pipeline = client.get_pipeline(" my_bench" )
642+
643+ pipeline.stop()
644+ pipeline.start()
645+ result = bench(pipeline)
646+ pipeline.stop()
647+
648+ upload_to_bencher(
649+ result,
650+ project = " my-project" ,
651+ token = " YOUR_BENCHER_TOKEN" ,
652+ branch = " feature/my-optimisation" , # the branch being tested
653+ start_point = " main" , # branch to branch off from
654+ start_point_clone_thresholds = True , # inherit alert thresholds
655+ start_point_max_versions = 10 , # how many historical runs to consider
656+ feldera_client = client,
657+ )
0 commit comments