Skip to content

Commit 2c62b0b

Browse files
committed
Add more merger metrics (batches per level and in-progress merges).
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 9966165 commit 2c62b0b

File tree

4 files changed

+171
-8
lines changed

4 files changed

+171
-8
lines changed

crates/dbsp/src/circuit/metrics.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub const COMPACTION_SIZE_SAVINGS: &str = "file.compaction_size";
5757
/// Compaction duration for a single batch.
5858
pub const COMPACTION_DURATION: &str = "file.compaction_duration";
5959

60-
/// Time a worker was stalled waiting for more merges to complete.
60+
/// Time in nanoseconds a worker was stalled waiting for more merges to complete.
6161
pub const COMPACTION_STALL_TIME: &str = "file.compaction_stall_time";
6262

6363
/// Number of records dropped due to LATENESS annotations
@@ -66,6 +66,12 @@ pub const TOTAL_LATE_RECORDS: &str = "records.late";
6666
/// Runtime in microseconds of an Operator evaluation
6767
pub const OPERATOR_EVAL_DURATION: &str = "operator.runtime_micros";
6868

69+
/// Number of batches in the spines at each level.
70+
pub const BATCHES_PER_LEVEL: &str = "spine.batches_per_level";
71+
72+
/// Number of pending merges in spines at each level.
73+
pub const ONGOING_MERGES_PER_LEVEL: &str = "spine.ongoing_merges";
74+
6975
/// Creates the appropriate metric name for this metric.
7076
/// As these metrics are DBSP related, they are prefixed with `dbsp_`.
7177
fn metric_name(name: &str) -> String {

crates/dbsp/src/circuit/runtime.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,26 @@ impl Runtime {
502502
WORKER_INDEX.get()
503503
}
504504

505+
/// Returns the worker index as a string.
506+
///
507+
/// This is useful for metric labels.
508+
pub fn worker_index_str() -> &'static str {
509+
static WORKER_INDEX_STRS: Lazy<[&'static str; 256]> = Lazy::new(|| {
510+
let mut data: [&'static str; 256] = [""; 256];
511+
for (i, item) in data.iter_mut().enumerate() {
512+
*item = Box::leak(i.to_string().into_boxed_str());
513+
}
514+
data
515+
});
516+
517+
WORKER_INDEX_STRS
518+
.get(WORKER_INDEX.get())
519+
.copied()
520+
.unwrap_or_else(|| {
521+
panic!("Limit workers to less than 256 or increase the limit in the code.")
522+
})
523+
}
524+
505525
/// Returns the minimum number of bytes in a batch to spill it to storage,
506526
/// or `None` if this thread doesn't have a [Runtime] or if it doesn't have
507527
/// storage configured.

crates/dbsp/src/trace/spine_async/mod.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ use crate::{
1515
cursor::CursorList, merge_batches, Batch, BatchReader, BatchReaderFactories, Cursor,
1616
Filter, Trace,
1717
},
18-
Error, NumEntries,
18+
Error, NumEntries, Runtime,
1919
};
2020

2121
use crate::storage::file::to_bytes;
2222
use crate::storage::write_commit_metadata;
2323
pub use crate::trace::spine_async::snapshot::SpineSnapshot;
2424
use crate::trace::CommittedSpine;
25-
use metrics::counter;
25+
use metrics::{counter, gauge};
2626
use ouroboros::self_referencing;
2727
use rand::Rng;
2828
use rkyv::{
@@ -44,19 +44,23 @@ use std::{
4444
sync::Condvar,
4545
};
4646
use textwrap::indent;
47+
use uuid::Uuid;
4748

4849
mod list_merger;
4950
mod snapshot;
5051
mod thread;
5152

5253
use self::thread::{BackgroundThread, WorkerStatus};
5354
use super::BatchLocation;
54-
use crate::circuit::metrics::COMPACTION_STALL_TIME;
55+
use crate::circuit::metrics::{BATCHES_PER_LEVEL, COMPACTION_STALL_TIME, ONGOING_MERGES_PER_LEVEL};
5556
use list_merger::{ListMerger, ListMergerBuilder};
5657

5758
/// Maximum amount of levels in the spine.
5859
pub(crate) const MAX_LEVELS: usize = 9;
5960

61+
/// Levels as &'static str for metrics
62+
pub(crate) const LEVELS_AS_STR: [&str; MAX_LEVELS] = ["0", "1", "2", "3", "4", "5", "6", "7", "8"];
63+
6064
impl<B: Batch + Send + Sync> From<(Vec<String>, &Spine<B>)> for CommittedSpine<B> {
6165
fn from((batches, spine): (Vec<String>, &Spine<B>)) -> Self {
6266
CommittedSpine {
@@ -162,6 +166,9 @@ where
162166
request_exit: bool,
163167
#[size_of(skip)]
164168
merge_stats: MergeStats,
169+
/// Unique identifier for the spine for metrics.
170+
#[size_of(skip)]
171+
ident: &'static str,
165172
}
166173

167174
impl<B> SharedState<B>
@@ -176,6 +183,7 @@ where
176183
slots: std::array::from_fn(|_| Slot::default()),
177184
request_exit: false,
178185
merge_stats: MergeStats::default(),
186+
ident: String::leak(Uuid::now_v7().to_string()),
179187
}
180188
}
181189

@@ -184,9 +192,7 @@ where
184192
fn add_batches(&mut self, batches: impl IntoIterator<Item = Arc<B>>) {
185193
for batch in batches {
186194
if !batch.is_empty() {
187-
self.slots[Spine::<B>::size_to_level(batch.len())]
188-
.loose_batches
189-
.push_back(batch);
195+
self.add_batch(batch);
190196
}
191197
}
192198
}
@@ -197,6 +203,9 @@ where
197203
debug_assert!(!batch.is_empty());
198204
let level = Spine::<B>::size_to_level(batch.len());
199205
self.slots[level].loose_batches.push_back(batch);
206+
207+
gauge!(BATCHES_PER_LEVEL, "worker" => Runtime::worker_index_str(), "level" => LEVELS_AS_STR[level], "id" => self.ident)
208+
.set(self.slots[level].n_batches() as f64);
200209
}
201210

202211
fn should_apply_backpressure(&self) -> bool {
@@ -256,6 +265,8 @@ where
256265
let cache_stats = batches.iter().fold(CacheStats::default(), |stats, batch| {
257266
stats + batch.cache_stats()
258267
});
268+
gauge!(ONGOING_MERGES_PER_LEVEL, "worker" => Runtime::worker_index_str(), "level" => LEVELS_AS_STR[level], "id" => self.ident)
269+
.set(0);
259270
self.merge_stats.report_merge(
260271
batches.iter().map(|b| b.len()).sum(),
261272
new_batch.len(),
@@ -396,7 +407,7 @@ where
396407
let start = Instant::now();
397408
let mut state = self.no_backpressure.wait(state).unwrap();
398409
state.merge_stats.backpressure_wait += start.elapsed();
399-
counter!(COMPACTION_STALL_TIME).increment(start.elapsed().as_secs());
410+
counter!(COMPACTION_STALL_TIME).increment(start.elapsed().as_nanos() as u64);
400411
}
401412
}
402413

@@ -514,6 +525,7 @@ where
514525
idle: &Arc<Condvar>,
515526
no_backpressure: &Arc<Condvar>,
516527
) -> WorkerStatus {
528+
let ident = state.lock().unwrap().ident;
517529
// Run in-progress merges.
518530
let ((key_filter, value_filter), frontier) = {
519531
let shared = state.lock().unwrap();
@@ -546,6 +558,9 @@ where
546558
.filter_map(|(level, slot)| slot.try_start_merge(level).map(|batches| (level, batches)))
547559
.collect::<Vec<_>>();
548560
for (level, batches) in start_merges {
561+
gauge!(ONGOING_MERGES_PER_LEVEL, "worker" => Runtime::worker_index_str(), "level" => LEVELS_AS_STR[level], "id" => ident)
562+
.set(batches.len() as f64);
563+
549564
let merger = ListMergerBuilder::with_capacity(batches.len())
550565
.with_batches(batches)
551566
.build();

scripts/plot_metrics.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import pandas as pd
2+
import json
3+
from plotnine import *
4+
5+
6+
def get_data_samples(file_path):
7+
"""
8+
Reads an NDJSON file and returns a list of JSON objects.
9+
10+
:param file_path: Path to the NDJSON file
11+
:return: List of JSON objects
12+
"""
13+
json_list = []
14+
15+
with open(file_path, "r", encoding="utf-8") as file:
16+
for line in file:
17+
json_list.append(json.loads(line.strip())) # Parse each line as JSON
18+
19+
return json_list
20+
21+
22+
# Function to process the data
23+
def parse_data(data_samples):
24+
records = []
25+
for timestamp, sample in enumerate(data_samples):
26+
for entry in sample:
27+
key = entry["key"]
28+
value = entry["value"].get("Gauge") or entry["value"].get("Counter") # Handle Gauge and Counter values
29+
labels = {label[0]: label[1] for label in entry.get("labels", [])} # Convert labels to dict
30+
31+
# Extract necessary information
32+
worker = labels.get("worker", "Total")
33+
level = labels.get("level", "Total")
34+
spine = labels.get("id", "Total")
35+
36+
# Append processed entry
37+
records.append({
38+
"timestamp": timestamp,
39+
"key": key,
40+
"worker": worker,
41+
"level": level,
42+
"value": value,
43+
"id": spine
44+
})
45+
46+
return pd.DataFrame(records)
47+
48+
49+
def make_plots(data_samples):
50+
# Convert the data
51+
df = parse_data(data_samples)
52+
53+
# Filter for specific metrics
54+
df_merges = df[df["key"] == "spine.ongoing_merges"]
55+
df_merges_summary = df_merges.groupby(["timestamp", "level"])["value"].agg(
56+
["mean", "max", "min"]).reset_index().melt(
57+
id_vars=["timestamp", "level"], var_name="stat", value_name="value")
58+
59+
df_batches = df[df["key"] == "spine.batches_per_level"]
60+
df_batches_summary = df_batches.groupby(["timestamp", "level"])["value"].agg(
61+
["mean", "max", "min"]).reset_index().melt(
62+
id_vars=["timestamp", "level"], var_name="stat", value_name="value")
63+
64+
# Get bytes written at every time
65+
df_disk = df[df["key"] == "disk.total_bytes_written"]
66+
df_disk = df_disk.sort_values("timestamp")
67+
df_disk["value"] = df_disk["value"].diff() / (1024 * 1024)
68+
df_disk = df_disk.dropna()
69+
df_disk["metric"] = "Writes MiB/s"
70+
71+
# Aggregate total values per timestamp
72+
df_merges_total = df_merges.groupby("timestamp")["value"].sum().reset_index()
73+
df_merges_total["worker"] = "Total"
74+
df_merges_total["level"] = "Total"
75+
df_merges_total["metric"] = "Current #Batches being merged"
76+
77+
df_batches_total = df_batches.groupby("timestamp")["value"].sum().reset_index()
78+
df_batches_total["worker"] = "Total"
79+
df_batches_total["level"] = "Total"
80+
df_batches_total["metric"] = "Current #Batches not being merged"
81+
82+
# Merge with original to include total lines
83+
df_merges = pd.concat([df_merges_summary])
84+
df_batches = pd.concat([df_batches_summary])
85+
86+
df_totals = pd.concat([df_disk, df_merges_total, df_batches_total])
87+
88+
# Plot function
89+
def create_plot(df, title, filename):
90+
plot = (
91+
ggplot(df, aes(x="timestamp", y="value", color="stat", group="stat"))
92+
+ geom_line(size=1)
93+
+ facet_wrap("~level", scales="free")
94+
+ labs(title=title, x="Time", y="Value")
95+
+ theme_classic()
96+
+ scale_y_continuous(limits=(0, None))
97+
)
98+
plot.save(filename, width=12, height=6, dpi=300)
99+
print(f"Saved {filename}")
100+
101+
# Generate plots
102+
create_plot(df_merges, "Current #Batches being merged (Min/Avg/Max from all Spines)", "ongoing_merges.png")
103+
create_plot(df_batches, "Current #Batches not being merged (Min/Avg/Max from all Spine)", "batches_per_level.png")
104+
105+
plot = (
106+
ggplot(df_totals, aes(x="timestamp", y="value", color="metric"))
107+
+ geom_line(size=1)
108+
+ facet_grid("metric ~ .", scales="free_y") # Separate plots for MiB/s and Counts
109+
+ labs(title="Pipeline Totals", x="Time", y="Value")
110+
+ theme_classic()
111+
+ scale_y_continuous(limits=(0, None))
112+
)
113+
114+
# Save the plot
115+
plot.save("pipeline_totals.png", width=12, height=8, dpi=300)
116+
117+
118+
if __name__ == '__main__':
119+
import sys
120+
121+
samples = get_data_samples(sys.argv[1])
122+
make_plots(samples)

0 commit comments

Comments
 (0)