Skip to content

Commit a32664c

Browse files
committed
Metrics refinements in variable naming and function documentation.
Suggested by Mihai. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent c30e872 commit a32664c

8 files changed

Lines changed: 59 additions & 48 deletions

File tree

crates/adapters/src/controller/mod.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use crossbeam::{
4343
};
4444
use datafusion::prelude::*;
4545
use dbsp::circuit::metrics::{
46-
COMPACTION_STALL_TIME, DBSP_OPERATOR_COMMIT_LATENCY, DBSP_STEP, DBSP_STEP_LATENCY,
47-
FILES_CREATED, FILES_DELETED, TOTAL_LATE_RECORDS,
46+
COMPACTION_STALL_TIME_NANOSECONDS, DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS, DBSP_STEP,
47+
DBSP_STEP_LATENCY_MICROSECONDS, FILES_CREATED, FILES_DELETED, TOTAL_LATE_RECORDS,
4848
};
4949
use dbsp::circuit::tokio::TOKIO;
5050
use dbsp::circuit::{CircuitStorageConfig, DevTweaks, Mode};
@@ -61,7 +61,8 @@ use feldera_ir::LirCircuit;
6161
use feldera_storage::checkpoint_synchronizer::CheckpointSynchronizer;
6262
use feldera_storage::histogram::ExponentialHistogram;
6363
use feldera_storage::metrics::{
64-
READ_BLOCKS, READ_LATENCY, SYNC_LATENCY, WRITE_BLOCKS, WRITE_LATENCY,
64+
READ_BLOCKS_BYTES, READ_LATENCY_MICROSECONDS, SYNC_LATENCY_MICROSECONDS, WRITE_BLOCKS_BYTES,
65+
WRITE_LATENCY_MICROSECONDS,
6566
};
6667
use feldera_types::checkpoint::CheckpointMetadata;
6768
use feldera_types::format::json::JsonLines;
@@ -724,7 +725,7 @@ impl Controller {
724725
"compaction_stall_duration_seconds",
725726
"Time in seconds a worker was stalled waiting for more merges to complete.",
726727
labels,
727-
COMPACTION_STALL_TIME.load(Ordering::Relaxed) as f64 / 1_000_000_000.0,
728+
COMPACTION_STALL_TIME_NANOSECONDS.load(Ordering::Relaxed) as f64 / 1_000_000_000.0,
728729
);
729730
metrics.counter(
730731
"files_created_total",
@@ -748,12 +749,12 @@ impl Controller {
748749
"dbsp_step_latency_seconds",
749750
"Latency of DBSP steps over the last 60 seconds or 1000 steps, whichever is less, in seconds",
750751
labels,
751-
&HistogramDiv::new(DBSP_STEP_LATENCY.lock().unwrap().snapshot(), 1_000_000.0));
752+
&HistogramDiv::new(DBSP_STEP_LATENCY_MICROSECONDS.lock().unwrap().snapshot(), 1_000_000.0));
752753
metrics.histogram(
753754
"dbsp_operator_checkpoint_latency_seconds",
754755
"Latency of individual operator checkpoint operations in seconds. (Because checkpoints run in parallel across workers, these will not add to `feldera_checkpoint_latency_seconds`.)",
755756
labels,
756-
&HistogramDiv::new(DBSP_OPERATOR_COMMIT_LATENCY.snapshot(), 1_000_000.0),
757+
&HistogramDiv::new(DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS.snapshot(), 1_000_000.0),
757758
);
758759

759760
metrics.histogram(
@@ -779,32 +780,32 @@ impl Controller {
779780
"storage_read_latency_seconds",
780781
"Read latency for storage blocks in seconds",
781782
labels,
782-
&HistogramDiv::new(READ_LATENCY.snapshot(), 1_000_000.0),
783+
&HistogramDiv::new(READ_LATENCY_MICROSECONDS.snapshot(), 1_000_000.0),
783784
);
784785
metrics.histogram(
785786
"storage_write_latency_seconds",
786787
"Write latency for storage blocks in seconds",
787788
labels,
788-
&HistogramDiv::new(WRITE_LATENCY.snapshot(), 1_000_000.0),
789+
&HistogramDiv::new(WRITE_LATENCY_MICROSECONDS.snapshot(), 1_000_000.0),
789790
);
790791
metrics.histogram(
791792
"storage_sync_latency_seconds",
792793
"Sync latency in seconds",
793794
labels,
794-
&HistogramDiv::new(SYNC_LATENCY.snapshot(), 1_000_000.0),
795+
&HistogramDiv::new(SYNC_LATENCY_MICROSECONDS.snapshot(), 1_000_000.0),
795796
);
796797

797798
metrics.histogram(
798799
"storage_read_block_bytes",
799800
"Sizes in bytes of blocks read from storage.",
800801
labels,
801-
&READ_BLOCKS.snapshot(),
802+
&READ_BLOCKS_BYTES.snapshot(),
802803
);
803804
metrics.histogram(
804805
"storage_write_block_bytes",
805806
"Sizes in bytes of blocks written to storage.",
806807
labels,
807-
&WRITE_BLOCKS.snapshot(),
808+
&WRITE_BLOCKS_BYTES.snapshot(),
808809
);
809810

810811
fn write_input_metric<F, M>(
@@ -1348,7 +1349,7 @@ impl CircuitThread {
13481349
.status
13491350
.global_metrics
13501351
.num_total_processed_records();
1351-
let written_before = WRITE_BLOCKS.sum();
1352+
let written_before = WRITE_BLOCKS_BYTES.sum();
13521353
let checkpoint = CHECKPOINT_LATENCY.record_callback(|| {
13531354
this.circuit
13541355
.commit_with_metadata(this.step, processed_records)
@@ -1377,7 +1378,7 @@ impl CircuitThread {
13771378
.map_err(Arc::new)
13781379
})
13791380
})?;
1380-
let written_after = WRITE_BLOCKS.sum();
1381+
let written_after = WRITE_BLOCKS_BYTES.sum();
13811382
CHECKPOINT_WRITTEN.record((written_after - written_before) / 1_000_000);
13821383
CHECKPOINT_PROCESSED_RECORDS.store(processed_records, Ordering::Relaxed);
13831384

crates/adapters/src/server/metrics.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ where
296296
///
297297
/// The values have to be specified together in a callback because the
298298
/// [Prometheus exposition format] requires that all of the values for a
299-
/// given metric be written together in one block.
299+
/// given metric to be written together in one block.
300300
///
301301
/// [Prometheus exposition format]: https://prometheus.io/docs/instrumenting/exposition_formats/
302302
/// [metric and label naming]: https://prometheus.io/docs/practices/naming/
@@ -316,9 +316,9 @@ where
316316
/// Adds a single `value` of type `value_type`, labeled with `labels`, with
317317
/// the given `name` and `help`.
318318
///
319-
/// This is a convenience function that is only correctly use for a metric
320-
/// with a single value. If the metric might have multiple values, use
321-
/// [values](Self::values) instead.
319+
/// This is a convenience function only for metrics with a single value. If
320+
/// the metric might have multiple values, use [values](Self::values)
321+
/// instead.
322322
pub fn value(
323323
&mut self,
324324
name: &str,
@@ -340,7 +340,7 @@ where
340340
///
341341
/// The values have to be specified together in a callback because the
342342
/// [Prometheus exposition format] requires that all of the values for a
343-
/// given counter be written together in one block.
343+
/// given counter to be written together in one block.
344344
///
345345
/// [Prometheus exposition format]: https://prometheus.io/docs/instrumenting/exposition_formats/
346346
/// [metric and label naming]: https://prometheus.io/docs/practices/naming/
@@ -354,8 +354,8 @@ where
354354
/// Adds a single counter `value`, labeled with `labels`, with the given
355355
/// `name` and `help`.
356356
///
357-
/// This is a convenience function that is only correctly use for a counter
358-
/// with a single value. If the counter might have multiple values, use
357+
/// This is a convenience function only for counters with a single value.
358+
/// For counters that can have multiple values, use
359359
/// [counters](Self::counters) instead.
360360
pub fn counter(&mut self, name: &str, help: &str, labels: &LabelStack, value: impl Value) {
361361
self.value(name, help, labels, ValueType::Counter, value);
@@ -371,7 +371,7 @@ where
371371
///
372372
/// The values have to be specified together in a callback because the
373373
/// [Prometheus exposition format] requires that all of the values for a
374-
/// given gauge be written together in one block.
374+
/// given gauge to be written together in one block.
375375
///
376376
/// [Prometheus exposition format]: https://prometheus.io/docs/instrumenting/exposition_formats/
377377
/// [metric and label naming]: https://prometheus.io/docs/practices/naming/
@@ -385,9 +385,9 @@ where
385385
/// Adds a single gauge `value`, labeled with `labels`, with the given
386386
/// `name` and `help`.
387387
///
388-
/// This is a convenience function that is only correctly use for a gauge
389-
/// with a single value. If the gauge might have multiple values, use
390-
/// [gauges](Self::gauges) instead.
388+
/// This is a convenience function only for gauges with a single value. For
389+
/// gauges that can have multiple values, use [gauges](Self::gauges)
390+
/// instead.
391391
pub fn gauge(&mut self, name: &str, help: &str, labels: &LabelStack, value: impl Value) {
392392
self.value(name, help, labels, ValueType::Gauge, value);
393393
}
@@ -422,7 +422,7 @@ where
422422
/// given `name` and `help`.
423423
///
424424
/// This is a convenience function that is only correctly use for a
425-
/// histogram with a single value. If the histogram might have multiple
425+
/// histogram with a single value. For histograms that can have multiple
426426
/// values, use [histogram](Self::histograms) instead.
427427
pub fn histogram(
428428
&mut self,
@@ -759,6 +759,7 @@ pub struct HistogramDiv<H> {
759759
impl<H> HistogramDiv<H> {
760760
/// Constructs a a new [HistogramDiv].
761761
pub fn new(inner: H, divisor: f64) -> Self {
762+
assert_ne!(divisor, 0.0);
762763
Self { inner, divisor }
763764
}
764765
}

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
cache::{CircuitCache, CircuitStoreMarker},
3535
fingerprinter::Fingerprinter,
3636
metadata::OperatorMeta,
37-
metrics::DBSP_OPERATOR_COMMIT_LATENCY,
37+
metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
3838
operator_traits::{
3939
BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
4040
QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
@@ -6085,7 +6085,7 @@ impl CircuitHandle {
60856085

60866086
self.circuit
60876087
.map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
6088-
DBSP_OPERATOR_COMMIT_LATENCY.record_callback(|| node.commit(base))
6088+
DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS.record_callback(|| node.commit(base))
60896089
})
60906090
}
60916091

crates/dbsp/src/circuit/dbsp_handle.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::circuit::checkpointer::Checkpointer;
2-
use crate::circuit::metrics::{DBSP_STEP, DBSP_STEP_LATENCY};
2+
use crate::circuit::metrics::{DBSP_STEP, DBSP_STEP_LATENCY_MICROSECONDS};
33
use crate::circuit::runtime::ThreadType;
44
use crate::monitor::visual_graph::Graph;
55
use crate::storage::backend::StorageError;
@@ -850,7 +850,10 @@ impl DBSPHandle {
850850
let span = Arc::new(Span::root("step", SpanContext::random()));
851851
let _guard = span.set_local_parent();
852852
let result = self.broadcast_command(Command::Step(span), |_, _| {});
853-
DBSP_STEP_LATENCY.lock().unwrap().record_elapsed(start);
853+
DBSP_STEP_LATENCY_MICROSECONDS
854+
.lock()
855+
.unwrap()
856+
.record_elapsed(start);
854857
if let Some(handle) = self.runtime.as_ref() {
855858
self.runtime_elapsed +=
856859
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
@@ -875,7 +878,10 @@ impl DBSPHandle {
875878
replay_complete.push(complete);
876879
});
877880

878-
DBSP_STEP_LATENCY.lock().unwrap().record_elapsed(start);
881+
DBSP_STEP_LATENCY_MICROSECONDS
882+
.lock()
883+
.unwrap()
884+
.record_elapsed(start);
879885
if let Some(handle) = self.runtime.as_ref() {
880886
self.runtime_elapsed +=
881887
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;

crates/dbsp/src/circuit/metrics.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub static FILES_CREATED: AtomicU64 = AtomicU64::new(0);
1717
pub static FILES_DELETED: AtomicU64 = AtomicU64::new(0);
1818

1919
/// Time in nanoseconds a worker was stalled waiting for more merges to complete.
20-
pub static COMPACTION_STALL_TIME: AtomicU64 = AtomicU64::new(0);
20+
pub static COMPACTION_STALL_TIME_NANOSECONDS: AtomicU64 = AtomicU64::new(0);
2121

2222
/// Number of records dropped due to LATENESS annotations
2323
pub static TOTAL_LATE_RECORDS: AtomicU64 = AtomicU64::new(0);
@@ -26,8 +26,9 @@ pub static TOTAL_LATE_RECORDS: AtomicU64 = AtomicU64::new(0);
2626
pub static DBSP_STEP: AtomicU64 = AtomicU64::new(0);
2727

2828
/// Latency of recent DBSP steps, in microseconds.
29-
pub static DBSP_STEP_LATENCY: Mutex<SlidingHistogram> =
29+
pub static DBSP_STEP_LATENCY_MICROSECONDS: Mutex<SlidingHistogram> =
3030
Mutex::new(SlidingHistogram::new(1000, Duration::from_secs(60)));
3131

3232
/// Latency of individual operator commits, in microseconds.
33-
pub static DBSP_OPERATOR_COMMIT_LATENCY: ExponentialHistogram = ExponentialHistogram::new();
33+
pub static DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS: ExponentialHistogram =
34+
ExponentialHistogram::new();

crates/dbsp/src/storage/backend/posixio_impl.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use crate::circuit::metrics::{FILES_CREATED, FILES_DELETED};
88
use crate::storage::{buffer_cache::FBuf, init};
99
use crate::Runtime;
1010
use feldera_storage::metrics::{
11-
READ_BLOCKS, READ_LATENCY, SYNC_LATENCY, WRITE_BLOCKS, WRITE_LATENCY,
11+
READ_BLOCKS_BYTES, READ_LATENCY_MICROSECONDS, SYNC_LATENCY_MICROSECONDS, WRITE_BLOCKS_BYTES,
12+
WRITE_LATENCY_MICROSECONDS,
1213
};
1314
use feldera_storage::tokio::TOKIO;
1415
use feldera_storage::{
@@ -98,8 +99,8 @@ impl FileReader for PosixReader {
9899
}
99100

100101
fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, StorageError> {
101-
READ_BLOCKS.record(location.size);
102-
READ_LATENCY.record_callback(|| {
102+
READ_BLOCKS_BYTES.record(location.size);
103+
READ_LATENCY_MICROSECONDS.record_callback(|| {
103104
sleep(self.ioop_delay);
104105
let mut buffer = FBuf::with_capacity(location.size);
105106

@@ -125,15 +126,15 @@ impl FileReader for PosixReader {
125126
let blocks = blocks
126127
.into_iter()
127128
.map(|location| {
128-
READ_BLOCKS.record(location.size);
129+
READ_BLOCKS_BYTES.record(location.size);
129130
let mut buffer = FBuf::with_capacity(location.size);
130131
match buffer.read_exact_at(&file, location.offset, location.size) {
131132
Ok(()) => Ok(Arc::new(buffer)),
132133
Err(e) => Err(e.into()),
133134
}
134135
})
135136
.collect();
136-
READ_LATENCY.record_elapsed(start);
137+
READ_LATENCY_MICROSECONDS.record_elapsed(start);
137138
callback(blocks);
138139
});
139140
} else {
@@ -221,7 +222,7 @@ impl FileWriter for PosixWriter {
221222
self.flush()?;
222223
}
223224

224-
SYNC_LATENCY.record_callback(|| {
225+
SYNC_LATENCY_MICROSECONDS.record_callback(|| {
225226
self.file.sync_all()?;
226227

227228
// Remove the .mut extension from the file.
@@ -264,7 +265,7 @@ impl PosixWriter {
264265
}
265266

266267
fn flush(&mut self) -> Result<(), IoError> {
267-
WRITE_LATENCY.record_callback(|| {
268+
WRITE_LATENCY_MICROSECONDS.record_callback(|| {
268269
if let Some(storage_mb_max) = Runtime::with_dev_tweaks(|tweaks| tweaks.storage_mb_max) {
269270
let usage_mb = (self.drop.usage.load(Ordering::Relaxed) / 1024 / 1024)
270271
.max(0)
@@ -282,7 +283,7 @@ impl PosixWriter {
282283
let mut cursor = bufs.as_mut_slice();
283284
while !cursor.is_empty() {
284285
let n = self.file.write_vectored(cursor)?;
285-
WRITE_BLOCKS.record(n);
286+
WRITE_BLOCKS_BYTES.record(n);
286287
self.drop.size += n as u64;
287288
self.drop.usage.fetch_add(n as i64, Ordering::Relaxed);
288289
IoSlice::advance_slices(&mut cursor, n);

crates/dbsp/src/trace/spine_async/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use crate::{
1010
circuit::{
1111
metadata::{MetaItem, OperatorMeta},
12-
metrics::COMPACTION_STALL_TIME,
12+
metrics::COMPACTION_STALL_TIME_NANOSECONDS,
1313
},
1414
dynamic::{DynVec, Factory, Weight},
1515
storage::buffer_cache::CacheStats,
@@ -440,7 +440,8 @@ where
440440
let start = Instant::now();
441441
let mut state = self.no_backpressure.wait(state).unwrap();
442442
state.spine_stats.backpressure_wait += start.elapsed();
443-
COMPACTION_STALL_TIME.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
443+
COMPACTION_STALL_TIME_NANOSECONDS
444+
.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
444445
}
445446
}
446447

crates/storage/src/metrics.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use crate::histogram::ExponentialHistogram;
22

33
/// Histogram of read latency, in microseconds.
4-
pub static READ_LATENCY: ExponentialHistogram = ExponentialHistogram::new();
4+
pub static READ_LATENCY_MICROSECONDS: ExponentialHistogram = ExponentialHistogram::new();
55

66
/// Histogram of write latency, in microseconds.
7-
pub static WRITE_LATENCY: ExponentialHistogram = ExponentialHistogram::new();
7+
pub static WRITE_LATENCY_MICROSECONDS: ExponentialHistogram = ExponentialHistogram::new();
88

99
/// Histogram of latency syncing to stable storage, in microseconds.
10-
pub static SYNC_LATENCY: ExponentialHistogram = ExponentialHistogram::new();
10+
pub static SYNC_LATENCY_MICROSECONDS: ExponentialHistogram = ExponentialHistogram::new();
1111

1212
/// Histogram of read block sizes, in bytes.
13-
pub static READ_BLOCKS: ExponentialHistogram = ExponentialHistogram::new();
13+
pub static READ_BLOCKS_BYTES: ExponentialHistogram = ExponentialHistogram::new();
1414

1515
/// Histogram of write block sizes, in bytes.
16-
pub static WRITE_BLOCKS: ExponentialHistogram = ExponentialHistogram::new();
16+
pub static WRITE_BLOCKS_BYTES: ExponentialHistogram = ExponentialHistogram::new();

0 commit comments

Comments
 (0)