Skip to content

Commit c00a5f4

Browse files
committed
[storage] Add sliding histogram metric.
This is useful for tracking recent history of a metric. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 0db5b40 commit c00a5f4

1 file changed

Lines changed: 102 additions & 1 deletion

File tree

crates/storage/src/histogram.rs

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{
2+
collections::VecDeque,
23
ops::RangeInclusive,
34
sync::atomic::{AtomicU64, Ordering},
4-
time::Instant,
5+
time::{Duration, Instant},
56
};
67

78
use itertools::Itertools;
@@ -67,6 +68,8 @@ impl ExponentialHistogram {
6768
}
6869
}
6970

71+
/// Calls `f` and records the amount of time that it takes to run, in
72+
/// microseconds, in the histogram.
7073
pub fn record_callback<F, T>(&self, f: F) -> T
7174
where
7275
F: FnOnce() -> T,
@@ -159,3 +162,101 @@ fn bucket_to_range(bucket: usize) -> RangeInclusive<u64> {
159162
_ => unreachable!(),
160163
}
161164
}
165+
166+
/// A sliding histogram with exponential buckets.
167+
///
168+
/// This histogram records up to a specified number of samples across a
169+
/// specified maximum amount of time. Within that range, it maintains an
170+
/// exponential histogram with the same form as [ExponentialHistogram].
171+
#[derive(Debug)]
172+
pub struct SlidingHistogram {
173+
buckets: [u64; N_BUCKETS],
174+
samples: VecDeque<Sample>,
175+
sum: u64,
176+
max_samples: usize,
177+
max_elapsed: Duration,
178+
}
179+
180+
#[derive(Debug)]
181+
struct Sample {
182+
time: Instant,
183+
value: u64,
184+
}
185+
186+
impl SlidingHistogram {
187+
/// Constructs a new sliding histogram. The histogram will keep at most the
188+
/// most recent `max_samples` samples that have been recorded over at most
189+
/// the most recent `max_elapsed` amount of time.
190+
pub const fn new(max_samples: usize, max_elapsed: Duration) -> Self {
191+
Self {
192+
buckets: [0; N_BUCKETS],
193+
samples: VecDeque::new(),
194+
sum: 0,
195+
max_samples,
196+
max_elapsed,
197+
}
198+
}
199+
200+
/// Records `value` in the histogram.
201+
pub fn record(&mut self, value: impl TryInto<u64>) {
202+
if let Ok(value) = value.try_into() {
203+
if self.samples.len() >= self.max_samples {
204+
self.drop_sample();
205+
}
206+
self.samples.push_back(Sample {
207+
time: Instant::now(),
208+
value,
209+
});
210+
self.buckets[number_to_bucket(value)] += 1;
211+
self.sum += value;
212+
}
213+
}
214+
215+
/// Records the time elapsed since `start` in the histogram, as a count of
216+
/// microseconds.
217+
pub fn record_elapsed(&mut self, start: Instant) {
218+
self.record(start.elapsed().as_micros());
219+
}
220+
221+
/// Returns a snapshot of the histogram.
222+
pub fn snapshot(&mut self) -> ExponentialHistogramSnapshot {
223+
// Drop samples that are more than `max_elapsed` older than the most
224+
// recent sample.
225+
//
226+
// (We use the most recent sample instead of the current time to avoid
227+
// dropping all the samples if nothing has been recorded recently.)
228+
if let Some(most_recent) = self.samples.back() {
229+
let cutoff = most_recent.time - self.max_elapsed;
230+
while self
231+
.samples
232+
.front()
233+
.is_some_and(|sample| sample.time < cutoff)
234+
{
235+
self.drop_sample();
236+
}
237+
}
238+
239+
ExponentialHistogramSnapshot {
240+
buckets: self.buckets,
241+
sum: self.sum,
242+
}
243+
}
244+
245+
/// Calls `f` and records the amount of time that it takes to run, in
246+
/// microseconds, in the histogram.
247+
pub fn record_callback<F, T>(&mut self, f: F) -> T
248+
where
249+
F: FnOnce() -> T,
250+
{
251+
let start = Instant::now();
252+
let retval = f();
253+
self.record_elapsed(start);
254+
retval
255+
}
256+
257+
fn drop_sample(&mut self) {
258+
let sample = self.samples.pop_front().unwrap();
259+
self.buckets[number_to_bucket(sample.value)] -= 1;
260+
self.sum -= sample.value;
261+
}
262+
}

0 commit comments

Comments
 (0)