Skip to content

Commit 27cc389

Browse files
committed
dbsp: Better stats for filters.
We add stats for the range filter. This lead to some refactoring: Since we now have two filters (with another one on the way) we consolidate the stats into a single struct that can be re-used across filters. It also revealed a performance issue with the current filter stats. Because this function is extremly hot adding the hit and miss atomics in CachePadded led to a 25% increase for the ingest benchmark. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 2444f32 commit 27cc389

File tree

23 files changed

+393
-135
lines changed

23 files changed

+393
-135
lines changed

crates/dbsp/src/circuit/metadata.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ pub const BLOOM_FILTER_MISSES_COUNT: MetricId =
135135
pub const BLOOM_FILTER_HIT_RATE_PERCENT: MetricId =
136136
MetricId(Cow::Borrowed("bloom_filter_hit_rate_percent"));
137137
pub const BLOOM_FILTER_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("bloom_filter_size_bytes"));
138+
pub const RANGE_FILTER_HITS_COUNT: MetricId = MetricId(Cow::Borrowed("range_filter_hits_count"));
139+
pub const RANGE_FILTER_MISSES_COUNT: MetricId =
140+
MetricId(Cow::Borrowed("range_filter_misses_count"));
141+
pub const RANGE_FILTER_HIT_RATE_PERCENT: MetricId =
142+
MetricId(Cow::Borrowed("range_filter_hit_rate_percent"));
143+
pub const RANGE_FILTER_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("range_filter_size_bytes"));
138144
pub const SPINE_BATCHES_COUNT: MetricId = MetricId(Cow::Borrowed("spine_batches_count"));
139145
pub const SPINE_STORAGE_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("spine_storage_size_bytes"));
140146
pub const MERGING_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("merging_size_bytes"));
@@ -160,7 +166,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
160166
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
161167
MetricId(Cow::Borrowed("input_integral_records_count"));
162168

163-
pub const CIRCUIT_METRICS: [CircuitMetric; 65] = [
169+
pub const CIRCUIT_METRICS: [CircuitMetric; 69] = [
164170
// State
165171
CircuitMetric {
166172
name: USED_MEMORY_BYTES,
@@ -282,6 +288,30 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 65] = [
282288
advanced: false,
283289
description: "Hit rate of the Bloom filter.",
284290
},
291+
CircuitMetric {
292+
name: RANGE_FILTER_SIZE_BYTES,
293+
category: CircuitMetricCategory::State,
294+
advanced: false,
295+
description: "Size of the cached range filter in bytes.",
296+
},
297+
CircuitMetric {
298+
name: RANGE_FILTER_HITS_COUNT,
299+
category: CircuitMetricCategory::State,
300+
advanced: false,
301+
description: "The number of hits across all range filters. The hits are summed across the range filters for all batches in the spine.",
302+
},
303+
CircuitMetric {
304+
name: RANGE_FILTER_MISSES_COUNT,
305+
category: CircuitMetricCategory::State,
306+
advanced: false,
307+
description: "The number of misses across all range filters. The misses are summed across the range filters for all batches in the spine.",
308+
},
309+
CircuitMetric {
310+
name: RANGE_FILTER_HIT_RATE_PERCENT,
311+
category: CircuitMetricCategory::State,
312+
advanced: false,
313+
description: "Hit rate of the range filter.",
314+
},
285315
CircuitMetric {
286316
name: RETAINMENT_BOUNDS,
287317
category: CircuitMetricCategory::State,

crates/dbsp/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod backend;
77
pub mod buffer_cache;
88
pub mod dirlock;
99
pub mod file;
10+
pub mod filter_stats;
1011
pub mod tracking_bloom_filter;
1112

1213
use fdlimit::{Outcome::LimitRaised, raise_fd_limit};

crates/dbsp/src/storage/file/reader.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use super::{AnyFactories, Deserializer, Factories};
77
use crate::dynamic::{DynVec, WeightTrait};
88
use crate::storage::buffer_cache::CacheAccess;
99
use crate::storage::file::format::FilterBlock;
10-
use crate::storage::tracking_bloom_filter::{BloomFilterStats, TrackingBloomFilter};
10+
use crate::storage::filter_stats::FilterStats;
11+
use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
1112
use crate::storage::{
1213
backend::StorageError,
1314
buffer_cache::{BufferCache, FBuf},
@@ -1682,11 +1683,11 @@ where
16821683
/// Returns statistics of the Bloom filter, including its size in bytes.
16831684
///
16841685
/// If the file doesn't have a Bloom filter, returns a default of zeros.
1685-
pub fn filter_stats(&self) -> BloomFilterStats {
1686+
pub fn filter_stats(&self) -> FilterStats {
16861687
if let Some(bloom_filter) = &self.bloom_filter {
16871688
bloom_filter.stats()
16881689
} else {
1689-
BloomFilterStats::default()
1690+
FilterStats::default()
16901691
}
16911692
}
16921693

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crossbeam::utils::CachePadded;
2+
use std::iter::Sum;
3+
use std::ops::{Add, AddAssign};
4+
use std::sync::atomic::{AtomicUsize, Ordering};
5+
6+
/// Statistics about an in-memory key filter.
7+
///
8+
/// The statistics implement addition such that they can be summed across
9+
/// batches. Their addition loses information about individual sizes, hits,
10+
/// misses and by extension, hit rates.
11+
#[derive(Clone, Copy, Debug, Default, PartialEq)]
12+
pub struct FilterStats {
13+
/// Filter size in bytes.
14+
pub size_byte: usize,
15+
/// Number of hits.
16+
pub hits: usize,
17+
/// Number of misses.
18+
pub misses: usize,
19+
}
20+
21+
impl Add for FilterStats {
22+
type Output = Self;
23+
24+
fn add(mut self, rhs: Self) -> Self::Output {
25+
self.add_assign(rhs);
26+
self
27+
}
28+
}
29+
30+
impl AddAssign for FilterStats {
31+
fn add_assign(&mut self, rhs: Self) {
32+
self.size_byte += rhs.size_byte;
33+
self.hits += rhs.hits;
34+
self.misses += rhs.misses;
35+
}
36+
}
37+
38+
impl Sum for FilterStats {
39+
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
40+
iter.fold(Self::default(), Add::add)
41+
}
42+
}
43+
44+
/// Shared hit/miss accounting for key filters.
45+
#[derive(Debug)]
46+
pub struct TrackingFilterStats {
47+
size_byte: usize,
48+
hits: CachePadded<AtomicUsize>,
49+
misses: CachePadded<AtomicUsize>,
50+
}
51+
52+
impl TrackingFilterStats {
53+
/// Creates tracking state for a filter of the given size.
54+
pub fn new(size_byte: usize) -> Self {
55+
Self {
56+
size_byte,
57+
hits: CachePadded::new(AtomicUsize::new(0)),
58+
misses: CachePadded::new(AtomicUsize::new(0)),
59+
}
60+
}
61+
62+
/// Retrieves statistics.
63+
pub fn stats(&self) -> FilterStats {
64+
FilterStats {
65+
size_byte: self.size_byte,
66+
hits: self.hits.load(Ordering::Relaxed),
67+
misses: self.misses.load(Ordering::Relaxed),
68+
}
69+
}
70+
71+
/// Records the result of one filter probe.
72+
pub fn record(&self, is_hit: bool) {
73+
if is_hit {
74+
self.hits.fetch_add(1, Ordering::Relaxed);
75+
} else {
76+
self.misses.fetch_add(1, Ordering::Relaxed);
77+
}
78+
}
79+
}

crates/dbsp/src/storage/tracking_bloom_filter.rs

Lines changed: 17 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,29 @@
1+
use crate::storage::filter_stats::{FilterStats, TrackingFilterStats};
12
use fastbloom::BloomFilter;
2-
use std::iter::Sum;
3-
use std::ops::{Add, AddAssign};
4-
use std::sync::atomic::{AtomicUsize, Ordering};
53

64
/// Bloom filter which tracks the number of hits and misses when lookups are performed.
75
/// It implements the subset of [`BloomFilter`] functions that are used by Feldera storage.
86
#[derive(Debug)]
97
pub struct TrackingBloomFilter {
108
/// Underlying Bloom filter.
119
bloom_filter: BloomFilter,
12-
/// Number of hits.
13-
hits: AtomicUsize,
14-
/// Number of misses.
15-
misses: AtomicUsize,
16-
}
17-
18-
/// Statistics about the Bloom filter.
19-
///
20-
/// The statistics implement addition such that they can be summed (e.g., when you have a spine
21-
/// consisting out of multiple batches, each with their own Bloom filter). However, their addition
22-
/// will lose information about individual sizes, hits, misses and by extension, hit rates.
23-
#[derive(Clone, Copy, Debug, Default, PartialEq)]
24-
pub struct BloomFilterStats {
25-
/// Bloom filter size in bytes.
26-
pub size_byte: usize,
27-
/// Number of hits.
28-
pub hits: usize,
29-
/// Number of misses.
30-
pub misses: usize,
31-
}
32-
33-
impl Add for BloomFilterStats {
34-
type Output = Self;
35-
36-
fn add(mut self, rhs: Self) -> Self::Output {
37-
self.add_assign(rhs);
38-
self
39-
}
40-
}
41-
42-
impl AddAssign for BloomFilterStats {
43-
fn add_assign(&mut self, rhs: Self) {
44-
self.size_byte += rhs.size_byte;
45-
self.hits += rhs.hits;
46-
self.misses += rhs.misses;
47-
}
48-
}
49-
50-
impl Sum for BloomFilterStats {
51-
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
52-
iter.fold(Self::default(), Add::add)
53-
}
10+
tracking: TrackingFilterStats,
5411
}
5512

5613
impl TrackingBloomFilter {
5714
/// Constructs a tracking Bloom filter which wraps a regular Bloom filter instance.
5815
/// It is assumed the underlying Bloom filter has not yet been used.
5916
pub fn new(bloom_filter: BloomFilter) -> Self {
17+
let size_byte = size_of_val(&bloom_filter) + bloom_filter.num_bits() / 8;
6018
Self {
19+
tracking: TrackingFilterStats::new(size_byte),
6120
bloom_filter,
62-
hits: AtomicUsize::new(0),
63-
misses: AtomicUsize::new(0),
6421
}
6522
}
6623

6724
/// Retrieves statistics.
68-
pub fn stats(&self) -> BloomFilterStats {
69-
BloomFilterStats {
70-
size_byte: size_of_val(&self.bloom_filter) + self.bloom_filter.num_bits() / 8,
71-
hits: self.hits.load(Ordering::Acquire),
72-
misses: self.misses.load(Ordering::Acquire),
73-
}
25+
pub fn stats(&self) -> FilterStats {
26+
self.tracking.stats()
7427
}
7528

7629
/// See [`BloomFilter::num_hashes`].
@@ -92,18 +45,15 @@ impl TrackingBloomFilter {
9245
/// It additionally counts the hits or misses, before returning.
9346
pub fn contains_hash(&self, hash: u64) -> bool {
9447
let is_hit = self.bloom_filter.contains_hash(hash);
95-
if is_hit {
96-
self.hits.fetch_add(1, Ordering::Release);
97-
} else {
98-
self.misses.fetch_add(1, Ordering::Release);
99-
}
48+
self.tracking.record(is_hit);
10049
is_hit
10150
}
10251
}
10352

10453
#[cfg(test)]
10554
mod tests {
106-
use super::{BloomFilterStats, TrackingBloomFilter};
55+
use super::TrackingBloomFilter;
56+
use crate::storage::filter_stats::FilterStats;
10757
use fastbloom::BloomFilter;
10858

10959
#[test]
@@ -114,7 +64,7 @@ mod tests {
11464
assert!(filter.num_hashes() >= 1);
11565
assert_eq!(
11666
filter.stats(),
117-
BloomFilterStats {
67+
FilterStats {
11868
size_byte: 96 + 8192 / 8,
11969
hits: 0,
12070
misses: 0,
@@ -126,7 +76,7 @@ mod tests {
12676
assert!(!filter.contains_hash(789));
12777
assert_eq!(
12878
filter.stats(),
129-
BloomFilterStats {
79+
FilterStats {
13080
size_byte: 96 + 8192 / 8,
13181
hits: 1,
13282
misses: 2,
@@ -137,8 +87,8 @@ mod tests {
13787
#[test]
13888
fn tracking_bloom_filter_stats_default() {
13989
assert_eq!(
140-
BloomFilterStats::default(),
141-
BloomFilterStats {
90+
FilterStats::default(),
91+
FilterStats {
14292
size_byte: 0,
14393
hits: 0,
14494
misses: 0,
@@ -148,24 +98,24 @@ mod tests {
14898

14999
#[test]
150100
fn tracking_bloom_filter_stats_addition() {
151-
let stats1 = BloomFilterStats {
101+
let stats1 = FilterStats {
152102
size_byte: 123,
153103
hits: 456,
154104
misses: 789,
155105
};
156-
let stats2 = BloomFilterStats {
106+
let stats2 = FilterStats {
157107
size_byte: 100,
158108
hits: 200,
159109
misses: 300,
160110
};
161-
let stats3 = BloomFilterStats {
111+
let stats3 = FilterStats {
162112
size_byte: 223,
163113
hits: 656,
164114
misses: 1089,
165115
};
166116
assert_eq!(stats1 + stats2, stats3);
167117
assert_eq!(
168-
vec![stats1, stats2].into_iter().sum::<BloomFilterStats>(),
118+
vec![stats1, stats2].into_iter().sum::<FilterStats>(),
169119
stats3
170120
);
171121
}

crates/dbsp/src/trace.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ pub use ord::{
7272

7373
use rkyv::{Deserialize, archived_root};
7474

75-
use crate::storage::tracking_bloom_filter::BloomFilterStats;
7675
use crate::{
7776
Error, NumEntries, Timestamp,
7877
algebra::MonoidValue,
7978
dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
8079
storage::file::reader::Error as ReaderError,
80+
storage::filter_stats::FilterStats,
8181
};
8282
pub use cursor::{Cursor, MergeCursor};
8383
pub use filter::{Filter, GroupFilter};
@@ -469,14 +469,20 @@ where
469469
/// the implementation need not attempt to cache the return value.
470470
fn approximate_byte_size(&self) -> usize;
471471

472-
/// Statistics of the Bloom filter used by [Cursor::seek_key_exact].
473-
/// The Bloom filter (kept in memory) is used there to quickly check
474-
/// whether a key might be present in the batch, before doing a
475-
/// binary tree lookup within the batch to be exactly sure.
476-
/// The statistics include for example the size in bytes and the hit rate.
477-
/// Only some kinds of batches use a filter; others should return
478-
/// `BloomFilterStats::default()`.
479-
fn filter_stats(&self) -> BloomFilterStats;
472+
/// Statistics of the secondary key filter used by [Cursor::seek_key_exact]
473+
/// after the range filter.
474+
///
475+
/// Today this is usually a Bloom filter. Batches without such a filter
476+
/// should return `FilterStats::default()`.
477+
fn filter_stats(&self) -> FilterStats;
478+
479+
/// Statistics of the in-memory range filter used by
480+
/// [Cursor::seek_key_exact].
481+
///
482+
/// Batches without a range filter should return `FilterStats::default()`.
483+
fn range_filter_stats(&self) -> FilterStats {
484+
FilterStats::default()
485+
}
480486

481487
/// Where the batch's data is stored.
482488
fn location(&self) -> BatchLocation {
@@ -657,9 +663,12 @@ where
657663
fn approximate_byte_size(&self) -> usize {
658664
(**self).approximate_byte_size()
659665
}
660-
fn filter_stats(&self) -> BloomFilterStats {
666+
fn filter_stats(&self) -> FilterStats {
661667
(**self).filter_stats()
662668
}
669+
fn range_filter_stats(&self) -> FilterStats {
670+
(**self).range_filter_stats()
671+
}
663672
fn location(&self) -> BatchLocation {
664673
(**self).location()
665674
}

0 commit comments

Comments
 (0)