#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "dash",
# "pandas",
# "plotly",
# ]
# ///
"""
Dashboard for Feldera benchmark CSV output.
Run with (needs uv):
$ ./dashboard.py -h
See `scalebench.py` to generate the CSV data for it.
"""
from __future__ import annotations
import argparse
import base64
import os
import re
from difflib import SequenceMatcher
from io import StringIO
from pathlib import Path
from typing import Iterable, List, Tuple
import pandas as pd
import plotly.graph_objects as go
from dash import Dash, Input, Output, State, dcc, html
DEFAULT_DATA_PATH = str(Path(__file__).resolve().parents[0] / "bench_results.csv")
VERSION_COLUMN = "platform_version"
METRICS: List[Tuple[str, str]] = [
("throughput_value", "Throughput [records/s]"),
("storage_value", "Storage [bytes]"),
("memory_value", "Memory [bytes]"),
("buffered_input_records_value", "Buffered Input Records"),
("state_amplification_value", "State Amplification"),
]
BYTE_METRICS = {"storage_value", "memory_value"}
ACCENT_COLOR = "#C533B9"
ACCENT_COLOR_ALT = "#FCAF4F"
GRID_COLOR = "#D9D9D9"
TEXT_COLOR = "#1E1E1E"
IO_WRITE_WARN_MIB = 1600
IO_WRITE_MAX_MIB = 3000
MEM_BW_WARN_GBS = 160
MEM_BW_MAX_GBS = 200
def load_csv(path: str) -> pd.DataFrame:
return pd.read_csv(path)
def coerce_numeric(df: pd.DataFrame, columns: Iterable[str]) -> pd.DataFrame:
for column in columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors="coerce")
return df
def prepare_data(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
if "pipeline_name" not in df.columns:
df["pipeline_name"] = "u64"
df[VERSION_COLUMN] = df[VERSION_COLUMN].astype(str)
numeric_columns = [
"pipeline_workers",
"payload_bytes",
"duration_s",
"throughput_value",
"storage_value",
"memory_value",
"buffered_input_records_value",
"state_amplification_value",
"mem_bw_read_min",
"mem_bw_read_max",
"mem_bw_read_mean",
"mem_bw_read_stdev",
"mem_bw_write_min",
"mem_bw_write_max",
"mem_bw_write_mean",
"mem_bw_write_stdev",
"mem_bw_total_min",
"mem_bw_total_max",
"mem_bw_total_mean",
"mem_bw_total_stdev",
]
df = coerce_numeric(df, numeric_columns)
if "payload_bytes" in df.columns:
df["payload_kib"] = df["payload_bytes"] / 1024.0
else:
df["payload_kib"] = pd.NA
return df
def aggregate_metric(df: pd.DataFrame, metric_key: str) -> pd.DataFrame:
agg = (
df.groupby("pipeline_workers", dropna=False)[metric_key]
.agg(["mean", "min", "max"])
.reset_index()
.rename(columns={"mean": "value_mean", "min": "value_min", "max": "value_max"})
)
return agg
def extract_program_sql(program_df: pd.DataFrame) -> str:
"""
Get program that was ran from results CSV, identify
variable parts for display.
"""
if "program_sql" not in program_df.columns:
return "No SQL available in CSV."
df = program_df.copy()
sort_cols = [
col
for col in ["pipeline_workers", "payload_bytes", "run_id"]
if col in df.columns
]
if sort_cols:
df = df.sort_values(sort_cols)
sql_values = df["program_sql"].dropna().tolist()
if not sql_values:
return "No SQL available in CSV."
first_sql_raw = sql_values[0]
if not isinstance(first_sql_raw, str) or not first_sql_raw:
return "No SQL available in CSV."
first_sql = first_sql_raw
if len(sql_values) == 1:
return first_sql.replace("\\n", "\n")
last_sql = sql_values[-1]
if not isinstance(last_sql, str) or not last_sql:
return first_sql.replace("\\n", "\n")
parameterized = parameterize_sql(first_sql, last_sql)
return parameterized.replace("\\n", "\n")
TOKEN_PATTERN = re.compile(r"\s+|\d+|[A-Za-z_]+|[^A-Za-z0-9_\s]")
def tokenize_sql(sql: str) -> list[str]:
return TOKEN_PATTERN.findall(sql)
def parameterize_sql(
first_sql: str, last_sql: str, placeholder: str = "$VARIABLE"
) -> str:
"""
When we have programs with changing parameters in the CSV (e.g.,
change the ingest size), we replace this with `placeholder` in
the program that is displayed in the UI.
"""
first_tokens = tokenize_sql(first_sql)
last_tokens = tokenize_sql(last_sql)
matcher = SequenceMatcher(None, first_tokens, last_tokens, autojunk=False)
parts: list[str] = []
last_was_placeholder = False
for tag, i1, i2, _j1, _j2 in matcher.get_opcodes():
if tag == "equal":
parts.extend(first_tokens[i1:i2])
last_was_placeholder = False
else:
if not last_was_placeholder:
parts.append(placeholder)
last_was_placeholder = True
return "".join(parts)
def payload_values_for_program(
program_df: pd.DataFrame,
) -> tuple[list[float], list[str]]:
"""
Given a dataset of measured values, construct labels for it to
display on the graphs.
"""
if "payload_bytes" in program_df.columns:
values = program_df["payload_bytes"].dropna().sort_values().unique().tolist()
labels = [format_bytes_binary(v) for v in values]
return values, labels
if "payload_kib" in program_df.columns:
values = program_df["payload_kib"].dropna().sort_values().unique().tolist()
labels = [f"{v:.3f} KiB" for v in values]
return values, labels
return [], []
def payload_values_for_versions(
primary_df: pd.DataFrame, compare_df: pd.DataFrame | None = None
) -> tuple[list[float], list[str]]:
values, labels = payload_values_for_program(primary_df)
if compare_df is None:
return values, labels
compare_values, _ = payload_values_for_program(compare_df)
if not compare_values:
return [], []
compare_set = set(compare_values)
label_map = {value: label for value, label in zip(values, labels)}
common_values = [value for value in values if value in compare_set]
common_labels = [label_map[value] for value in common_values]
return common_values, common_labels
def build_write_barplot(
primary_df: pd.DataFrame,
compare_df: pd.DataFrame | None,
primary_label: str,
compare_label: str | None = None,
) -> go.Figure | None:
"""
Create a Barchart for two result-sets (to compare two different versions).
"""
if (
"storage_value" not in primary_df.columns
or "duration_s" not in primary_df.columns
):
return None
def agg_mib_per_sec(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return pd.DataFrame(columns=["pipeline_workers", "mib_s"])
values = (df["storage_value"] / df["duration_s"]).dropna() / (1024.0**2)
data = df.loc[values.index, "pipeline_workers"].to_frame()
data["mib_s"] = values
return (
data.groupby("pipeline_workers", dropna=False)["mib_s"].mean().reset_index()
)
primary_agg = agg_mib_per_sec(primary_df)
if primary_agg.empty:
return None
compare_agg = None
if compare_df is not None and compare_label:
compare_agg = agg_mib_per_sec(compare_df)
if not compare_agg.empty:
common_workers = sorted(
set(primary_agg["pipeline_workers"])
& set(compare_agg["pipeline_workers"])
)
if common_workers:
primary_agg = primary_agg[
primary_agg["pipeline_workers"].isin(common_workers)
]
compare_agg = compare_agg[
compare_agg["pipeline_workers"].isin(common_workers)
]
compare_agg = compare_agg.sort_values("pipeline_workers")
else:
compare_agg = None
fig = go.Figure()
primary_agg = primary_agg.sort_values("pipeline_workers")
max_val = primary_agg["mib_s"].max()
primary_x = [str(int(v)) for v in primary_agg["pipeline_workers"]]
fig.add_trace(
go.Bar(
x=primary_x,
y=primary_agg["mib_s"],
name=primary_label,
marker_color=ACCENT_COLOR,
offsetgroup="primary",
text=[f"{v:.0f} MiB/s" for v in primary_agg["mib_s"]],
textposition="outside",
cliponaxis=False,
hovertemplate=(
f"{primary_label}
Workers=%{{x}}
"
"IO Writes=%{y:.1f} MiB/s"
),
)
)
if compare_agg is not None and compare_label:
max_val = max(max_val, compare_agg["mib_s"].max())
fig.add_trace(
go.Bar(
x=[str(int(v)) for v in compare_agg["pipeline_workers"]],
y=compare_agg["mib_s"],
name=compare_label,
marker_color=ACCENT_COLOR_ALT,
offsetgroup="compare",
text=[f"{v:.0f} MiB/s" for v in compare_agg["mib_s"]],
textposition="outside",
cliponaxis=False,
hovertemplate=(
f"{compare_label}
Workers=%{{x}}
"
"IO Writes=%{y:.1f} MiB/s"
),
)
)
fig.update_layout(
barmode="group",
margin=dict(l=40, r=16, t=10, b=40),
height=260,
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
font=dict(family="DM Sans, Segoe UI, sans-serif", color=TEXT_COLOR),
showlegend=True,
)
range_max = max(IO_WRITE_MAX_MIB * 1.05, max_val * 1.15)
fig.update_yaxes(
title="MiB/s",
showgrid=True,
gridcolor=GRID_COLOR,
zeroline=False,
range=[0, range_max],
)
worker_values = sorted(primary_agg["pipeline_workers"].dropna().unique().tolist())
worker_labels = [str(int(v)) for v in worker_values]
fig.update_xaxes(
showgrid=False,
zeroline=False,
title="Pipeline workers",
tickmode="array",
tickvals=worker_labels,
ticktext=worker_labels,
type="category",
)
fig.add_shape(
type="line",
xref="paper",
x0=0,
x1=1,
y0=IO_WRITE_WARN_MIB,
y1=IO_WRITE_WARN_MIB,
line=dict(color="#F4E19E", width=2, dash="dash"),
)
fig.add_shape(
type="line",
xref="paper",
x0=0,
x1=1,
y0=IO_WRITE_MAX_MIB,
y1=IO_WRITE_MAX_MIB,
line=dict(color="#F7A1A1", width=2, dash="dash"),
)
fig.add_annotation(
x=1.02,
y=IO_WRITE_WARN_MIB,
xref="paper",
yref="y",
text=f"{IO_WRITE_WARN_MIB} MiB/s",
showarrow=False,
font=dict(size=10, color="#8A7A2E"),
)
fig.add_annotation(
x=1.02,
y=IO_WRITE_MAX_MIB,
xref="paper",
yref="y",
text=f"{IO_WRITE_MAX_MIB} MiB/s",
showarrow=False,
font=dict(size=10, color="#9A2C2C"),
)
return fig
def build_gauge_card(
figures: list[go.Figure | None],
animation_delay: int = 0,
title: str = "IO Writes",
graph_height: str = "260px",
) -> html.Div:
graphs = []
for fig in figures:
if fig is None:
continue
graphs.append(
dcc.Graph(
figure=fig,
config={"displayModeBar": False, "responsive": True},
className="graph",
style={"height": graph_height, "width": "100%"},
)
)
gauge_content = (
html.Div(graphs, className="gauge-grid")
if graphs
else html.Div("IO write data unavailable.", className="card empty")
)
return html.Div(
className="card",
style={"animationDelay": f"{animation_delay}ms"},
children=[
html.Div(
[
title,
html.Span(
"?",
**{
"data-tooltip": (
"Calculated as total storage used divided by experiment duration in seconds; "
"not continuous disk IO monitoring."
)
},
className="info-tooltip",
style={
"display": "inline-block",
"marginLeft": "6px",
"width": "16px",
"height": "16px",
"lineHeight": "16px",
"textAlign": "center",
"borderRadius": "50%",
"border": "1px solid #9aa0a6",
"color": "#6b7280",
"fontSize": "11px",
"cursor": "default",
"fontWeight": 600,
},
),
],
className="card-title",
),
gauge_content,
],
)
def format_bytes_binary(value: float) -> str:
units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
try:
value = float(value)
except (TypeError, ValueError):
return "n/a"
if value < 1:
return f"{value:.0f} B"
unit_idx = 0
while value >= 1024.0 and unit_idx < len(units) - 1:
value /= 1024.0
unit_idx += 1
if value >= 100:
return f"{value:.0f} {units[unit_idx]}"
if value >= 10:
return f"{value:.1f} {units[unit_idx]}"
return f"{value:.2f} {units[unit_idx]}"
def binary_tick_values(max_val: float, ticks: int = 5) -> tuple[list[float], list[str]]:
if max_val <= 0:
return [0.0], ["0 B"]
step = max_val / max(ticks - 1, 1)
vals = [round(step * idx, 6) for idx in range(ticks)]
labels = [format_bytes_binary(val) for val in vals]
return vals, labels
def build_metric_figure(
series: list[dict],
label: str,
metric_key: str,
show_ideal: bool = True,
) -> go.Figure:
if metric_key == "state_amplification_value":
hover_fmt = ".3f"
else:
hover_fmt = ".3s"
is_bytes = metric_key in BYTE_METRICS
fig = go.Figure()
has_data = False
max_val = None
for entry in series:
agg = entry["agg"]
if agg.empty:
continue
has_data = True
agg_sorted = agg.sort_values("pipeline_workers")
err_plus = agg_sorted["value_max"] - agg_sorted["value_mean"]
err_minus = agg_sorted["value_mean"] - agg_sorted["value_min"]
customdata = None
hovertemplate = f"{entry['name']}
Workers=%{{x}}
Mean=%{{y:{hover_fmt}}}"
if is_bytes:
customdata = [format_bytes_binary(v) for v in agg_sorted["value_mean"]]
hovertemplate = f"{entry['name']}
Workers=%{{x}}
Mean=%{{customdata}}"
max_val = (
agg_sorted["value_max"].max()
if max_val is None
else max(max_val, agg_sorted["value_max"].max())
)
fig.add_trace(
go.Scatter(
x=agg_sorted["pipeline_workers"],
y=agg_sorted["value_mean"],
mode="lines+markers",
name=entry["name"],
line=dict(color=entry["color"], width=2),
marker=dict(size=7, color=entry["color"]),
customdata=customdata,
error_y=dict(
type="data",
symmetric=False,
array=err_plus,
arrayminus=err_minus,
thickness=1.2,
width=3,
),
hovertemplate=hovertemplate,
)
)
if not has_data:
fig.update_layout(
title=None,
xaxis_title="Pipeline workers",
yaxis_title=label,
font=dict(family="DM Sans, Segoe UI, sans-serif", color=TEXT_COLOR),
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
margin=dict(l=40, r=20, t=20, b=40),
height=320,
autosize=True,
showlegend=False,
)
return fig
if metric_key == "throughput_value" and show_ideal:
primary_series = next(
(entry for entry in series if not entry["agg"].empty), None
)
if primary_series is not None:
agg_sorted = primary_series["agg"].sort_values("pipeline_workers")
baseline_workers = agg_sorted["pipeline_workers"].min()
baseline_row = agg_sorted[
agg_sorted["pipeline_workers"] == baseline_workers
]
if not baseline_row.empty and baseline_workers:
baseline_value = baseline_row["value_mean"].iloc[0]
ideal = baseline_value * (
agg_sorted["pipeline_workers"] / baseline_workers
)
fig.add_trace(
go.Scatter(
x=agg_sorted["pipeline_workers"],
y=ideal,
mode="lines",
name="Ideal speedup",
line=dict(color=ACCENT_COLOR_ALT, width=2, dash="dash"),
hovertemplate=(
f"Ideal speedup
Workers=%{{x}}
"
f"Ideal=%{{y:{hover_fmt}}}"
),
)
)
fig.update_layout(
title=None,
xaxis_title="Pipeline workers",
yaxis_title=label,
font=dict(family="DM Sans, Segoe UI, sans-serif", color=TEXT_COLOR),
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
margin=dict(l=40, r=20, t=20, b=40),
height=320,
autosize=True,
showlegend=len(fig.data) > 1,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.08,
xanchor="left",
x=0,
font=dict(size=11),
),
)
fig.update_xaxes(showgrid=True, gridcolor=GRID_COLOR, zeroline=False)
fig.update_yaxes(
showgrid=True, gridcolor=GRID_COLOR, zeroline=False, rangemode="tozero"
)
if is_bytes and max_val is not None:
tickvals, ticktext = binary_tick_values(max_val)
fig.update_yaxes(tickvals=tickvals, ticktext=ticktext)
return fig
def build_mem_bw_figure(
primary_df: pd.DataFrame,
compare_df: pd.DataFrame | None,
primary_label: str,
compare_label: str | None = None,
) -> go.Figure | None:
metric_defs = [
("mem_bw_read_mean", "Read", "#3B82F6"),
("mem_bw_write_mean", "Write", "#F59E0B"),
("mem_bw_total_mean", "Total", "#10B981"),
]
if not all(key in primary_df.columns for key, _label, _color in metric_defs):
return None
fig = go.Figure()
worker_labels: list[str] = []
has_data = False
max_val = None
for metric_key, metric_label, color in metric_defs:
primary_agg = aggregate_metric(primary_df, metric_key)
if primary_agg.empty:
continue
primary_agg = primary_agg.sort_values("pipeline_workers")
compare_agg = None
if compare_df is not None and compare_label:
if metric_key in compare_df.columns:
compare_agg = aggregate_metric(compare_df, metric_key)
if compare_agg.empty:
compare_agg = None
else:
compare_agg = compare_agg.sort_values("pipeline_workers")
if compare_agg is not None:
common_workers = sorted(
set(primary_agg["pipeline_workers"])
& set(compare_agg["pipeline_workers"])
)
if common_workers:
primary_agg = primary_agg[
primary_agg["pipeline_workers"].isin(common_workers)
]
compare_agg = compare_agg[
compare_agg["pipeline_workers"].isin(common_workers)
]
x_primary = [str(int(v)) for v in primary_agg["pipeline_workers"]]
if not worker_labels:
worker_labels = x_primary
err_plus = primary_agg["value_max"] - primary_agg["value_mean"]
err_minus = primary_agg["value_mean"] - primary_agg["value_min"]
fig.add_trace(
go.Bar(
x=x_primary,
y=primary_agg["value_mean"],
name=f"{primary_label} {metric_label}",
marker_color=color,
offsetgroup=f"{metric_label}-primary",
error_y=dict(
type="data",
symmetric=False,
array=err_plus,
arrayminus=err_minus,
thickness=1.2,
width=3,
),
hovertemplate=(
f"{primary_label} {metric_label}
"
"Workers=%{x}
"
"Mean=%{y:.2f} GB/s"
),
)
)
has_data = True
max_val = (
primary_agg["value_max"].max()
if max_val is None
else max(max_val, primary_agg["value_max"].max())
)
if compare_agg is not None and compare_label:
x_compare = [str(int(v)) for v in compare_agg["pipeline_workers"]]
err_plus = compare_agg["value_max"] - compare_agg["value_mean"]
err_minus = compare_agg["value_mean"] - compare_agg["value_min"]
fig.add_trace(
go.Bar(
x=x_compare,
y=compare_agg["value_mean"],
name=f"{compare_label} {metric_label}",
marker_color=color,
offsetgroup=f"{metric_label}-compare",
opacity=0.55,
error_y=dict(
type="data",
symmetric=False,
array=err_plus,
arrayminus=err_minus,
thickness=1.2,
width=3,
),
hovertemplate=(
f"{compare_label} {metric_label}
"
"Workers=%{x}
"
"Mean=%{y:.2f} GB/s"
),
)
)
max_val = max(max_val, compare_agg["value_max"].max())
if not has_data:
return None
fig.update_layout(
barmode="group",
margin=dict(l=40, r=16, t=20, b=40),
height=320,
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
font=dict(family="DM Sans, Segoe UI, sans-serif", color=TEXT_COLOR),
showlegend=True,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.08,
xanchor="left",
x=0,
font=dict(size=11),
),
)
fig.update_yaxes(
title="GB/s",
showgrid=True,
gridcolor=GRID_COLOR,
zeroline=False,
rangemode="tozero",
)
if max_val is not None:
range_max = max(max_val * 1.2, MEM_BW_MAX_GBS * 1.05)
fig.update_yaxes(range=[0, range_max])
fig.update_xaxes(
showgrid=False,
zeroline=False,
title="Pipeline workers",
tickmode="array",
tickvals=worker_labels,
ticktext=worker_labels,
type="category",
)
fig.add_shape(
type="line",
xref="paper",
x0=0,
x1=1,
y0=MEM_BW_WARN_GBS,
y1=MEM_BW_WARN_GBS,
line=dict(color="#F4E19E", width=2, dash="dash"),
)
fig.add_shape(
type="line",
xref="paper",
x0=0,
x1=1,
y0=MEM_BW_MAX_GBS,
y1=MEM_BW_MAX_GBS,
line=dict(color="#F7A1A1", width=2, dash="dash"),
)
fig.add_annotation(
x=1.02,
y=MEM_BW_WARN_GBS,
xref="paper",
yref="y",
text=f"{MEM_BW_WARN_GBS} GB/s",
showarrow=False,
font=dict(size=10, color="#8A7A2E"),
)
fig.add_annotation(
x=1.02,
y=MEM_BW_MAX_GBS,
xref="paper",
yref="y",
text=f"{MEM_BW_MAX_GBS} GB/s",
showarrow=False,
font=dict(size=10, color="#9A2C2C"),
)
return fig
def load_inline_css() -> str:
css_path = Path(__file__).with_name("dashboard.css")
try:
return css_path.read_text(encoding="utf-8")
except OSError:
return ""
app = Dash(__name__)
app.title = "Feldera Bench Dashboard"
app.config.suppress_callback_exceptions = True
app.index_string = f"""
{{%metas%}}
{{%title%}}
{{%favicon%}}
{{%css%}}
{{%app_entry%}}
"""
app.layout = html.Div(
className="app",
children=[
dcc.Store(id="data-store"),
html.Div(
className="app-header",
children=[
html.Div(
className="header-left",
children=[
html.Img(
src="https://www.feldera.com/images/Logomark.png",
alt="Feldera",
className="brand-logo",
),
html.Div(
children=[
html.H1("Benchmark Dashboard", className="app-title"),
html.P(
"Explore throughput, memory, storage, buffered input, and state amplification.",
className="app-subtitle",
),
],
),
],
),
html.Div(
className="header-right",
children=[
html.Div(id="load-status", className="status-pill"),
],
),
],
),
html.Div(
className="layout",
children=[
html.Div(
className="controls",
children=[
html.Div(
className="control-group",
children=[
html.Label("Data Source", className="control-label"),
dcc.Upload(
id="csv-upload",
className="upload",
children=html.Button(
"Add CSV results",
className="button button-outline",
),
multiple=False,
),
html.Div(
"Default CSV always loads; uploaded CSV is appended.",
className="control-help",
),
],
),
html.Div(
className="control-group",
children=[
html.Label("Versions", className="control-label"),
html.Div(
className="control-subgroup",
children=[
html.Label(
"Primary",
className="control-label control-label-small",
),
dcc.Dropdown(
id="version-primary-dropdown",
className="control-dropdown",
clearable=False,
),
],
),
html.Div(
className="control-subgroup",
children=[
html.Label(
"Compare",
className="control-label control-label-small",
),
dcc.Dropdown(
id="version-compare-dropdown",
className="control-dropdown",
clearable=True,
placeholder="None",
),
],
),
html.Div(
"Compare metrics across two platform versions.",
className="control-help",
),
],
),
],
),
html.Div(
className="content",
children=[
html.Div(
className="tabs-card",
children=[dcc.Tabs(id="program-tabs", className="tabs")],
),
html.Div(
id="sql-block",
className="sql-card",
children=[
html.Div("Program SQL", className="card-title"),
dcc.Markdown(
"```sql\nLoading...\n```",
id="sql-content",
className="sql-markdown",
),
html.Div(
id="payload-slider-wrapper",
className="payload-slider",
children=[
html.Label(
"Payload Record Size [Bytes]",
className="control-label control-label-small",
),
dcc.Slider(
id="payload-slider",
min=0,
max=0,
step=1,
value=0,
marks={0: "0"},
included=False,
disabled=True,
tooltip={
"placement": "bottom",
"always_visible": False,
},
),
],
),
],
),
html.Div(
id="metric-graphs",
className="grid",
children=[
html.Div(
className="card",
children=[
html.Div("IO Writes", className="card-title"),
html.Div("Loading...", className="card empty"),
],
)
],
),
],
),
],
),
],
)
@app.callback(
Output("data-store", "data"),
Output("load-status", "children"),
Input("csv-upload", "contents"),
State("csv-upload", "filename"),
prevent_initial_call=False,
)
def handle_load(upload_contents, upload_filename):
path = DEFAULT_DATA_PATH
if not os.path.exists(path):
return None, f"Missing file: {path}"
try:
default_df = load_csv(path)
except Exception as exc:
return None, f"Failed to load default CSV: {exc}"
if upload_contents:
try:
_content_type, content_string = upload_contents.split(",", 1)
decoded = base64.b64decode(content_string)
upload_df = pd.read_csv(StringIO(decoded.decode("utf-8")))
except Exception as exc:
return None, f"Failed to load upload: {exc}"
combined = pd.concat([default_df, upload_df], ignore_index=True)
df = prepare_data(combined)
label = upload_filename or "uploaded file"
return (
df.to_json(orient="split"),
f"Loaded {len(default_df):,} rows + {len(upload_df):,} rows from {label}.",
)
df = prepare_data(default_df)
return df.to_json(orient="split"), f"Loaded {len(df):,} rows."
@app.callback(
Output("version-primary-dropdown", "options"),
Output("version-primary-dropdown", "value"),
Output("version-compare-dropdown", "options"),
Output("version-compare-dropdown", "value"),
Input("data-store", "data"),
State("version-primary-dropdown", "value"),
State("version-compare-dropdown", "value"),
)
def update_version_dropdown(data, primary_value, compare_value):
if not data:
return [], None, [], None
df = pd.read_json(StringIO(data), orient="split")
versions = sorted(df[VERSION_COLUMN].dropna().unique().tolist())
if not versions:
return [], None, [], None
options = [{"label": version, "value": version} for version in versions]
primary = primary_value if primary_value in versions else versions[0]
compare_options = [{"label": "None", "value": ""}] + options
compare = compare_value if compare_value in versions else None
if compare == primary:
compare = None
return options, primary, compare_options, compare or ""
@app.callback(
Output("program-tabs", "children"),
Output("program-tabs", "value"),
Input("data-store", "data"),
Input("version-primary-dropdown", "value"),
Input("version-compare-dropdown", "value"),
)
def update_program_tabs(data, primary_version, compare_version):
if not data or not primary_version:
return [], None
df = pd.read_json(StringIO(data), orient="split")
primary_df = df[df[VERSION_COLUMN] == primary_version]
programs = set(primary_df["pipeline_name"].dropna().unique().tolist())
if compare_version and compare_version != primary_version:
compare_df = df[df[VERSION_COLUMN] == compare_version]
compare_programs = set(compare_df["pipeline_name"].dropna().unique().tolist())
programs = programs & compare_programs
programs = sorted(programs)
tabs = [dcc.Tab(label=program, value=program) for program in programs]
selected = programs[0] if programs else None
return tabs, selected
@app.callback(
Output("payload-slider", "min"),
Output("payload-slider", "max"),
Output("payload-slider", "marks"),
Output("payload-slider", "value"),
Output("payload-slider", "step"),
Output("payload-slider", "disabled"),
Output("payload-slider-wrapper", "style"),
Input("program-tabs", "value"),
Input("version-primary-dropdown", "value"),
Input("version-compare-dropdown", "value"),
Input("data-store", "data"),
)
def update_payload_slider(program, primary_version, compare_version, data):
if not data or not program or not primary_version:
return 0, 0, {0: "0"}, 0, None, True, {"display": "none"}
df = pd.read_json(StringIO(data), orient="split")
primary_df = df[
(df[VERSION_COLUMN] == primary_version) & (df["pipeline_name"] == program)
].copy()
compare_df = None
if compare_version and compare_version != primary_version:
compare_df = df[
(df[VERSION_COLUMN] == compare_version) & (df["pipeline_name"] == program)
].copy()
values, labels = payload_values_for_versions(primary_df, compare_df)
if not values:
return 0, 0, {0: "0"}, 0, None, True, {"display": "none"}
if len(values) == 1:
return 0, 0, {0: labels[0]}, 0, None, True, {"display": "none"}
marks = {idx: label for idx, label in enumerate(labels)}
slider_max = max(len(values) - 1, 0)
return 0, slider_max, marks, 0, 1, False, {}
@app.callback(
Output("sql-content", "children"),
Output("metric-graphs", "children"),
Input("program-tabs", "value"),
Input("payload-slider", "value"),
Input("version-primary-dropdown", "value"),
Input("version-compare-dropdown", "value"),
Input("data-store", "data"),
)
def update_graphs(program, payload_idx, primary_version, compare_version, data):
if not data:
empty_gauge = build_gauge_card(
figures=[None],
)
return (
"No SQL available.",
html.Div(
[
html.Div("No data loaded.", className="card empty"),
empty_gauge,
],
className="grid",
),
)
df = pd.read_json(StringIO(data), orient="split")
if not program or not primary_version:
return (
"Select a program tab.",
html.Div(
[
html.Div("Select a program tab.", className="card empty"),
build_gauge_card(
figures=[None],
),
],
className="grid",
),
)
compare_active = compare_version and compare_version != primary_version
program_df_all = df[
(df[VERSION_COLUMN] == primary_version) & (df["pipeline_name"] == program)
].copy()
compare_df_all = None
if compare_active:
compare_df_all = df[
(df[VERSION_COLUMN] == compare_version) & (df["pipeline_name"] == program)
].copy()
program_df = program_df_all
compare_df = compare_df_all
if payload_idx is not None:
values, _labels = payload_values_for_versions(program_df_all, compare_df_all)
if values and isinstance(payload_idx, (int, float)):
idx = int(payload_idx)
if 0 <= idx < len(values):
payload_value = values[idx]
if "payload_bytes" in program_df.columns:
if payload_value in program_df["payload_bytes"].unique():
program_df = program_df[
program_df["payload_bytes"] == payload_value
]
if compare_df is not None:
compare_df = compare_df[
compare_df["payload_bytes"] == payload_value
]
elif "payload_kib" in program_df.columns:
program_df = program_df[program_df["payload_kib"] == payload_value]
if compare_df is not None:
compare_df = compare_df[
compare_df["payload_kib"] == payload_value
]
gauge_df = program_df
compare_gauge_df = compare_df
sql_primary = extract_program_sql(program_df_all)
sql_compare = (
extract_program_sql(compare_df_all) if compare_df_all is not None else None
)
sql_block = f"```sql\n{sql_primary}\n```"
if sql_compare is not None:
if sql_compare.strip() == sql_primary.strip():
sql_block = (
f"```sql\n-- {primary_version} vs {compare_version}\n{sql_primary}\n```"
)
else:
sql_block = (
f"```sql\n-- {primary_version}\n{sql_primary}\n```\n"
f"```sql\n-- {compare_version}\n{sql_compare}\n```"
)
write_fig = build_write_barplot(
gauge_df,
compare_gauge_df if compare_active else None,
str(primary_version),
str(compare_version) if compare_active else None,
)
write_gauge_figs = [write_fig]
graphs = []
gauge_added = False
for idx, (metric_key, label) in enumerate(METRICS):
if metric_key not in program_df.columns:
graphs.append(
html.Div(
f"Missing metric: {metric_key}",
className="card empty",
style={"animationDelay": f"{len(graphs) * 60}ms"},
)
)
continue
series = [
{
"name": str(primary_version),
"agg": aggregate_metric(program_df, metric_key),
"color": ACCENT_COLOR,
}
]
if compare_active and compare_df is not None:
series.append(
{
"name": str(compare_version),
"agg": aggregate_metric(compare_df, metric_key),
"color": ACCENT_COLOR_ALT,
}
)
fig = build_metric_figure(series, label, metric_key, show_ideal=True)
graphs.append(
html.Div(
className="card",
style={"animationDelay": f"{len(graphs) * 60}ms"},
children=[
html.Div(label, className="card-title"),
dcc.Graph(
figure=fig,
config={"displayModeBar": False, "responsive": True},
className="graph",
style={"height": "320px", "width": "100%"},
),
],
)
)
if metric_key == "storage_value" and any(write_gauge_figs):
graphs.append(
build_gauge_card(
write_gauge_figs,
animation_delay=len(graphs) * 60,
title="IO Writes",
)
)
gauge_added = True
if not gauge_added:
graphs.append(
build_gauge_card(
write_gauge_figs,
animation_delay=len(graphs) * 60,
title="IO Writes",
)
)
mem_bw_fig = build_mem_bw_figure(
program_df,
compare_df if compare_active else None,
str(primary_version),
str(compare_version) if compare_active else None,
)
if mem_bw_fig is not None:
graphs.append(
html.Div(
className="card",
style={"animationDelay": f"{len(graphs) * 60}ms"},
children=[
html.Div("Memory Bandwidth [GB/s]", className="card-title"),
dcc.Graph(
figure=mem_bw_fig,
config={"displayModeBar": False, "responsive": True},
className="graph",
style={"height": "320px", "width": "100%"},
),
],
)
)
return sql_block, graphs
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Feldera benchmark dashboard.")
parser.add_argument(
"--data",
default=DEFAULT_DATA_PATH,
help="Path to the benchmark CSV to load by default.",
)
args = parser.parse_args()
DEFAULT_DATA_PATH = str(Path(args.data).expanduser())
app.run(debug=True)