Skip to content

Commit d70c655

Browse files
blpmihaibudiu
authored andcommitted
[dbsp] Remove per-worker-thread metrics.
These metrics are meant for internal debugging purposes, not for ongoing monitoring. It is valuable to make them available on-demand, which we do via profile metadata, but they are not the kind of data that Prometheus is meant to continuously accept. This is partly a practical problem with Prometheus, which isn't suited for "high cardinality" data, meaning data with a high Cartesian product of its internal dimensions. Partly addresses #4442. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent a2e5b0b commit d70c655

12 files changed

Lines changed: 53 additions & 929 deletions

File tree

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,8 +1016,6 @@ pub trait Node: Any {
10161016

10171017
fn init(&mut self) {}
10181018

1019-
fn metrics(&self) {}
1020-
10211019
fn metadata(&self, output: &mut OperatorMeta);
10221020

10231021
fn fixedpoint(&self, scope: Scope) -> bool;
@@ -1268,26 +1266,6 @@ impl GlobalNodeId {
12681266
pub fn lir_node_id(&self) -> LirNodeId {
12691267
LirNodeId::new(&self.path_as_string())
12701268
}
1271-
1272-
pub(crate) fn metrics_id(&self) -> String {
1273-
let mut mid = String::with_capacity(3 + self.0.len() * 3);
1274-
mid.push('o');
1275-
1276-
let path = self.path();
1277-
1278-
if path.is_empty() {
1279-
mid.push_str("root");
1280-
} else {
1281-
for (i, node) in path.iter().enumerate() {
1282-
if i > 0 {
1283-
mid.push('_');
1284-
}
1285-
write!(&mut mid, "{}", node.0).unwrap();
1286-
}
1287-
}
1288-
1289-
mid
1290-
}
12911269
}
12921270

12931271
type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
@@ -3240,8 +3218,6 @@ where
32403218

32413219
circuit.nodes.borrow()[id.0].borrow_mut().eval().await?;
32423220

3243-
circuit.nodes.borrow()[id.0].borrow().metrics();
3244-
32453221
circuit.log_scheduler_event(&SchedulerEvent::eval_end(
32463222
circuit.nodes.borrow()[id.0].borrow().as_ref(),
32473223
));
@@ -4127,10 +4103,6 @@ where
41274103
self.operator.init(&self.id);
41284104
}
41294105

4130-
fn metrics(&self) {
4131-
self.operator.metrics()
4132-
}
4133-
41344106
fn metadata(&self, output: &mut OperatorMeta) {
41354107
self.operator.metadata(output);
41364108
}
@@ -4259,10 +4231,6 @@ where
42594231
self.operator.init(&self.id);
42604232
}
42614233

4262-
fn metrics(&self) {
4263-
self.operator.metrics();
4264-
}
4265-
42664234
fn metadata(&self, output: &mut OperatorMeta) {
42674235
self.operator.metadata(output);
42684236
}
@@ -4405,10 +4373,6 @@ where
44054373
self.operator.init(&self.id);
44064374
}
44074375

4408-
fn metrics(&self) {
4409-
self.operator.metrics();
4410-
}
4411-
44124376
fn metadata(&self, output: &mut OperatorMeta) {
44134377
self.operator.metadata(output);
44144378
}
@@ -4544,10 +4508,6 @@ where
45444508
self.operator.init(&self.id);
45454509
}
45464510

4547-
fn metrics(&self) {
4548-
self.operator.metrics();
4549-
}
4550-
45514511
fn metadata(&self, output: &mut OperatorMeta) {
45524512
self.operator.metadata(output);
45534513
}
@@ -4740,10 +4700,6 @@ where
47404700
self.operator.init(&self.id);
47414701
}
47424702

4743-
fn metrics(&self) {
4744-
self.operator.metrics();
4745-
}
4746-
47474703
fn metadata(&self, output: &mut OperatorMeta) {
47484704
self.operator.metadata(output);
47494705
}
@@ -4936,10 +4892,6 @@ where
49364892
self.operator.init(&self.id);
49374893
}
49384894

4939-
fn metrics(&self) {
4940-
self.operator.metrics()
4941-
}
4942-
49434895
fn metadata(&self, output: &mut OperatorMeta) {
49444896
self.operator.metadata(output);
49454897
}
@@ -5106,10 +5058,6 @@ where
51065058
self.operator.init(&self.id);
51075059
}
51085060

5109-
fn metrics(&self) {
5110-
self.operator.metrics();
5111-
}
5112-
51135061
fn metadata(&self, output: &mut OperatorMeta) {
51145062
self.operator.metadata(output);
51155063
}
@@ -5297,10 +5245,6 @@ where
52975245
self.operator.init(&self.id);
52985246
}
52995247

5300-
fn metrics(&self) {
5301-
self.operator.metrics();
5302-
}
5303-
53045248
fn metadata(&self, output: &mut OperatorMeta) {
53055249
self.operator.metadata(output);
53065250
}
@@ -5473,10 +5417,6 @@ where
54735417
self.operator.init(&self.id);
54745418
}
54755419

5476-
fn metrics(&self) {
5477-
self.operator.metrics();
5478-
}
5479-
54805420
fn metadata(&self, output: &mut OperatorMeta) {
54815421
self.operator.metadata(output);
54825422
}
@@ -5635,11 +5575,6 @@ where
56355575
self.operator.borrow_mut().init(&self.id);
56365576
}
56375577

5638-
fn metrics(&self) {
5639-
// Avoid producing duplicate metrics for input and output parts of the operator;
5640-
// otherwise it will be double-counted in circuit-level metrics.
5641-
}
5642-
56435578
fn metadata(&self, _output: &mut OperatorMeta) {
56445579
// Avoid producing duplicate metadata for input and output parts of the operator;
56455580
// otherwise it will be double-counted in circuit-level metrics.
@@ -5785,10 +5720,6 @@ where
57855720
self.operator.borrow_mut().init(&self.id);
57865721
}
57875722

5788-
fn metrics(&self) {
5789-
self.operator.borrow().metrics()
5790-
}
5791-
57925723
fn metadata(&self, output: &mut OperatorMeta) {
57935724
self.operator.borrow().metadata(output)
57945725
}

crates/dbsp/src/circuit/metrics.rs

Lines changed: 1 addition & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
//! The constants defined in this module are the names of metrics that the
44
//! backends maintain via [`metrics`] crate interfaces.
55
6-
use crate::circuit::GlobalNodeId;
7-
use crate::Runtime;
8-
use ::metrics::{describe_counter, describe_gauge, describe_histogram, Unit as MetricUnit};
6+
use ::metrics::{describe_counter, describe_histogram, Unit as MetricUnit};
97

108
/// Total number of files created.
119
pub const FILES_CREATED: &str = "disk.total_files_created";
@@ -72,162 +70,6 @@ pub const BATCHES_PER_LEVEL: &str = "spine.batches_per_level";
7270
/// Number of pending merges in spines at each level.
7371
pub const ONGOING_MERGES_PER_LEVEL: &str = "spine.ongoing_merges";
7472

75-
/// Creates the appropriate metric name for this metric.
76-
/// As these metrics are DBSP related, they are prefixed with `dbsp_`.
77-
fn metric_name(name: &str) -> String {
78-
format!("dbsp_{}", name)
79-
}
80-
81-
/// A metric of type `Gauge`.
82-
///
83-
/// Gauge represents a single value that can go up or down over time, and always starts out
84-
/// with an initial value of zero.
85-
#[derive(Clone, size_of::SizeOf)]
86-
pub(crate) struct Gauge(#[size_of(skip)] metrics::Gauge);
87-
88-
impl Gauge {
89-
/// Describe and initialize a new [`Gauge`].
90-
///
91-
/// The following labels are set to this gauge by default:
92-
/// `worker` => Appropriate runtime worker thread index
93-
/// `gid` => The global node id of the current operator
94-
pub(crate) fn new(
95-
name: &str,
96-
description: Option<String>,
97-
unit: Option<&str>,
98-
gid: &GlobalNodeId,
99-
mut labels: Vec<(String, String)>,
100-
) -> Self {
101-
labels.push(("worker".to_owned(), Runtime::worker_index().to_string()));
102-
labels.push(("gid".to_owned(), gid.metrics_id()));
103-
104-
let unit: Option<metrics::Unit> = unit.and_then(metrics::Unit::from_string);
105-
let name = metric_name(name);
106-
let description = description.unwrap_or_default();
107-
108-
match unit {
109-
Some(unit) => {
110-
describe_gauge!(name.clone(), unit, description);
111-
}
112-
None => {
113-
describe_gauge!(name.clone(), description);
114-
}
115-
};
116-
117-
Self(metrics::gauge!(name, &labels[..]))
118-
}
119-
120-
/// Set the value of this [`Gauge`] to `value`.
121-
pub(crate) fn set(&self, value: f64) {
122-
self.0.set(value)
123-
}
124-
125-
/// Increment the value of this [`Gauge`] by `value`.
126-
#[allow(unused)]
127-
pub(crate) fn increment(&self, value: f64) {
128-
self.0.increment(value)
129-
}
130-
131-
/// Decrement the value of this [`Gauge`] by `value`.
132-
#[allow(unused)]
133-
pub(crate) fn decrement(&self, value: f64) {
134-
self.0.decrement(value)
135-
}
136-
}
137-
138-
/// Metric of type `Histogram`.
139-
///
140-
/// Histograms measure the distribution of values for a given set of measurements,
141-
/// and start with no initial values.
142-
#[derive(Clone, size_of::SizeOf)]
143-
pub(crate) struct Histogram(#[size_of(skip)] metrics::Histogram);
144-
145-
impl Histogram {
146-
/// Describe and initialize a new [`Histogram`].
147-
///
148-
/// The following labels are set to this histogram by default:
149-
/// `worker` => Appropriate runtime worker thread index
150-
/// `gid` => The global node id of the current operator
151-
#[allow(unused)]
152-
pub(crate) fn new(
153-
name: &str,
154-
description: Option<String>,
155-
unit: Option<&str>,
156-
gid: &GlobalNodeId,
157-
mut labels: Vec<(String, String)>,
158-
) -> Self {
159-
labels.push(("worker".to_owned(), Runtime::worker_index().to_string()));
160-
labels.push(("gid".to_owned(), gid.metrics_id()));
161-
162-
let unit: Option<metrics::Unit> = unit.and_then(metrics::Unit::from_string);
163-
let name = metric_name(name);
164-
let description = description.unwrap_or_default();
165-
166-
match unit {
167-
Some(unit) => {
168-
describe_histogram!(name.clone(), unit, description);
169-
}
170-
None => {
171-
describe_histogram!(name.clone(), description);
172-
}
173-
};
174-
175-
Self(metrics::histogram!(name, &labels[..]))
176-
}
177-
178-
#[allow(unused)]
179-
pub(crate) fn record(&self, value: f64) {
180-
self.0.record(value)
181-
}
182-
}
183-
184-
/// Metric of type `Counter`.
185-
///
186-
/// Counters represent a single monotonic value, which means the value can only be incremented,
187-
/// not decremented, and always starts out with an initial value of zero.
188-
#[derive(Clone, size_of::SizeOf)]
189-
pub(crate) struct Counter(#[size_of(skip)] metrics::Counter);
190-
191-
impl Counter {
192-
/// Describe and initialize a new [`Counter`].
193-
///
194-
/// The following labels are set to this counter by default:
195-
/// `worker` => Appropriate runtime worker thread index
196-
/// `gid` => The global node id of the current operator
197-
#[allow(unused)]
198-
pub(crate) fn new(
199-
name: &str,
200-
description: Option<String>,
201-
unit: Option<&str>,
202-
gid: &GlobalNodeId,
203-
mut labels: Vec<(String, String)>,
204-
) -> Self {
205-
labels.push(("worker".to_owned(), Runtime::worker_index().to_string()));
206-
labels.push(("gid".to_owned(), gid.metrics_id()));
207-
208-
let unit: Option<metrics::Unit> = unit.and_then(metrics::Unit::from_string);
209-
let name = metric_name(name);
210-
let description = description.unwrap_or_default();
211-
212-
match unit {
213-
Some(unit) => {
214-
describe_counter!(name.clone(), unit, description);
215-
}
216-
None => {
217-
describe_counter!(name.clone(), description);
218-
}
219-
};
220-
221-
Self(metrics::counter!(name, &labels[..]))
222-
}
223-
224-
/// Increments this [`Counter`] by `value`.
225-
#[allow(unused)]
226-
pub(crate) fn increment(&self, value: u64) {
227-
self.0.increment(value)
228-
}
229-
}
230-
23173
/// Adds descriptions for the metrics we expose.
23274
pub(crate) fn describe_metrics() {
23375
// Storage backend metrics.

crates/dbsp/src/circuit/operator_traits.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ pub trait Operator: 'static {
3232
/// Initialize the operator
3333
fn init(&mut self, _global_id: &GlobalNodeId) {}
3434

35-
/// Reports metrics about this operator
36-
fn metrics(&self) {}
37-
3835
/// Collects metadata about the current operator
3936
fn metadata(&self, _meta: &mut OperatorMeta) {}
4037

0 commit comments

Comments
 (0)