Skip to content

Commit 998fc38

Browse files
committed
dbsp: add roaring-based file filters
1 parent 539f122 commit 998fc38

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2408
-392
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ reqwest-websocket = "0.5.0"
217217
rkyv = { version = "0.7.45", default-features = false }
218218
rmp-serde = "1.3.0"
219219
rmpv = "1.3.0"
220+
roaring = "0.11.3"
220221
rstest = "0.15"
221222
# Make sure this is the same rustls version used by the `tonic` crate.
222223
# See the `ensure_default_crypto_provider` function.

crates/dbsp/src/circuit/metadata.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
167167
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
168168
MetricId(Cow::Borrowed("input_integral_records_count"));
169169

170-
pub const CIRCUIT_METRICS: [CircuitMetric; 70] = [
170+
pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [
171171
// State
172172
CircuitMetric {
173173
name: USED_MEMORY_BYTES,
@@ -269,7 +269,7 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 70] = [
269269
name: BLOOM_FILTER_BITS_PER_KEY,
270270
category: CircuitMetricCategory::State,
271271
advanced: false,
272-
description: "Average number of bits per key in the Bloom filter.",
272+
description: "Average number of bits per key across batches that use a Bloom filter.",
273273
},
274274
CircuitMetric {
275275
name: BLOOM_FILTER_SIZE_BYTES,
@@ -295,6 +295,30 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 70] = [
295295
advanced: false,
296296
description: "Hit rate of the Bloom filter.",
297297
},
298+
CircuitMetric {
299+
name: ROARING_FILTER_SIZE_BYTES,
300+
category: CircuitMetricCategory::State,
301+
advanced: false,
302+
description: "Size of the bitmap filter in bytes.",
303+
},
304+
CircuitMetric {
305+
name: ROARING_FILTER_HITS_COUNT,
306+
category: CircuitMetricCategory::State,
307+
advanced: false,
308+
description: "The number of hits across all bitmap filters. The hits are summed across the bitmap filters for all batches in the spine.",
309+
},
310+
CircuitMetric {
311+
name: ROARING_FILTER_MISSES_COUNT,
312+
category: CircuitMetricCategory::State,
313+
advanced: false,
314+
description: "The number of misses across all bitmap filters. The misses are summed across the bitmap filters for all batches in the spine.",
315+
},
316+
CircuitMetric {
317+
name: ROARING_FILTER_HIT_RATE_PERCENT,
318+
category: CircuitMetricCategory::State,
319+
advanced: false,
320+
description: "Hit rate of the bitmap filter.",
321+
},
298322
CircuitMetric {
299323
name: RANGE_FILTER_SIZE_BYTES,
300324
category: CircuitMetricCategory::State,

crates/dbsp/src/dynamic/data.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,24 @@ use crate::{
1212
rkyv::SerializeDyn,
1313
},
1414
hash::default_hash,
15+
utils::SupportsRoaring,
1516
};
1617

1718
/// Defines the minimal set of operations that must be supported by
1819
/// all data types stored in DBSP batches.
1920
///
2021
/// This trait is object safe and can be invoked via dynamic dispatch.
2122
pub trait Data:
22-
Comparable + Clonable + SerializeDyn + DeserializableDyn + Send + Sync + Debug + AsAny + SizeOf
23+
Comparable
24+
+ Clonable
25+
+ SerializeDyn
26+
+ DeserializableDyn
27+
+ Send
28+
+ Sync
29+
+ Debug
30+
+ AsAny
31+
+ SizeOf
32+
+ SupportsRoaring
2333
{
2434
/// Compute a hash of the object using default hasher and seed.
2535
fn default_hash(&self) -> u64;

crates/dbsp/src/storage.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ pub mod backend;
77
pub mod buffer_cache;
88
pub mod dirlock;
99
pub mod file;
10-
pub mod filter_stats;
1110
pub mod tracking_bloom_filter;
1211

1312
use fdlimit::{Outcome::LimitRaised, raise_fd_limit};

crates/dbsp/src/storage/file.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
//! value and for sequential reads. It should be possible to disable indexing
3737
//! by data value for workloads that don't require it.
3838
//!
39-
//! Layer files support approximate set membership query in `~O(1)` time using
40-
//! [a filter block](format::FilterBlock).
39+
//! Layer files support cheap key-membership tests using a per-batch filter
40+
//! block. The default filter is Bloom-based; key types whose per-batch span
41+
//! fits in `u32` can alternatively use an exact roaring bitmap filter by
42+
//! storing keys relative to the batch minimum.
4143
//!
4244
//! Layer files should support 1 TB data size.
4345
//!
@@ -98,6 +100,7 @@ use std::{
98100
use std::{any::Any, sync::Arc};
99101
use std::{fmt::Debug, ptr::NonNull};
100102

103+
mod filter;
101104
pub mod format;
102105
mod item;
103106
pub mod reader;
@@ -108,6 +111,10 @@ use crate::{
108111
dynamic::{DataTrait, Erase, Factory, WithFactory},
109112
storage::file::item::RefTup2Factory,
110113
};
114+
pub use filter::BatchKeyFilter;
115+
pub use filter::FilterPlan;
116+
pub use filter::TrackingRoaringBitmap;
117+
pub use filter::{FilterKind, FilterStats, TrackingFilterStats};
111118
pub use item::{ArchivedItem, Item, ItemFactory, WithItemFactory};
112119

113120
const BLOOM_FILTER_SEED: u128 = 42;
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
mod bloom;
2+
mod roaring;
3+
mod stats;
4+
5+
use crate::{
6+
Runtime,
7+
dynamic::{DataTrait, DynVec},
8+
storage::tracking_bloom_filter::TrackingBloomFilter,
9+
trace::{BatchReader, BatchReaderFactories, sample_keys_from_batches},
10+
};
11+
use dyn_clone::clone_box;
12+
use rand::thread_rng;
13+
use std::io;
14+
15+
pub use roaring::TrackingRoaringBitmap;
16+
pub(crate) use roaring::{
17+
FILTER_PLAN_MIN_SAMPLE_SIZE, FILTER_PLAN_SAMPLE_PERCENT, RoaringLookupSampleStats,
18+
};
19+
pub use stats::{FilterKind, FilterStats, TrackingFilterStats};
20+
21+
/// In-memory representation of the per-batch key filter.
22+
#[derive(Debug)]
23+
pub enum BatchKeyFilter {
24+
/// Probabilistic Bloom filter over key hashes.
25+
Bloom(TrackingBloomFilter),
26+
27+
/// Exact roaring bitmap for key types whose batch's range fits in `u32`.
28+
RoaringU32(TrackingRoaringBitmap),
29+
}
30+
31+
impl BatchKeyFilter {
32+
pub(crate) fn new_bloom(estimated_keys: usize, bloom_false_positive_rate: f64) -> Self {
33+
Self::Bloom(bloom::new_bloom_filter(
34+
estimated_keys,
35+
bloom_false_positive_rate,
36+
))
37+
}
38+
39+
pub(crate) fn new_roaring_u32<K>(min: &K) -> Self
40+
where
41+
K: DataTrait + ?Sized,
42+
{
43+
Self::RoaringU32(TrackingRoaringBitmap::with_min(min))
44+
}
45+
46+
pub(crate) fn deserialize_bloom(num_hashes: u32, data: Vec<u64>) -> Self {
47+
Self::Bloom(bloom::deserialize_bloom_filter(num_hashes, data))
48+
}
49+
50+
pub(crate) fn deserialize_roaring_u32<K>(data: &[u8], min: &K) -> io::Result<Self>
51+
where
52+
K: DataTrait + ?Sized,
53+
{
54+
TrackingRoaringBitmap::deserialize_from(data, min).map(Self::RoaringU32)
55+
}
56+
57+
pub(crate) fn insert_key<K>(&mut self, key: &K)
58+
where
59+
K: DataTrait + ?Sized,
60+
{
61+
match self {
62+
Self::Bloom(filter) => {
63+
filter.insert_hash(key.default_hash());
64+
}
65+
Self::RoaringU32(filter) => {
66+
filter.insert_key(key);
67+
}
68+
}
69+
}
70+
pub(crate) fn finalize(&mut self) {
71+
match self {
72+
Self::Bloom(_) => {}
73+
Self::RoaringU32(filter) => filter.finalize(),
74+
}
75+
}
76+
}
77+
78+
/// Merge-time input used to choose the batch membership filter before writing.
79+
///
80+
/// The writer must know upfront whether it is building Bloom or bitmap state,
81+
/// because it cannot switch filters after the first key is written. The plan
82+
/// therefore bundles:
83+
/// - the merged batch bounds, which tell us whether min-offset roaring fits;
84+
/// - a sampled subset of input keys, which lets us predict lookup behavior
85+
/// when Bloom and roaring are both enabled.
86+
pub struct FilterPlan<K>
87+
where
88+
K: DataTrait + ?Sized,
89+
{
90+
min: Box<K>,
91+
max: Box<K>,
92+
sampled_keys: Option<Box<DynVec<K>>>,
93+
}
94+
95+
impl<K> FilterPlan<K>
96+
where
97+
K: DataTrait + ?Sized,
98+
{
99+
fn sample_count_for_filter_plan(num_keys: usize) -> usize {
100+
let scaled = ((num_keys as f64) * (FILTER_PLAN_SAMPLE_PERCENT / 100.0)).ceil() as usize;
101+
scaled.max(FILTER_PLAN_MIN_SAMPLE_SIZE).min(num_keys)
102+
}
103+
104+
/// Builds a filter plan from the known minimum and maximum batch keys.
105+
pub fn from_bounds(min: &K, max: &K) -> Self {
106+
Self {
107+
min: clone_box(min),
108+
max: clone_box(max),
109+
sampled_keys: None,
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
pub(crate) fn with_sampled_keys(mut self, sampled_keys: Box<DynVec<K>>) -> Self {
115+
self.sampled_keys = Some(sampled_keys);
116+
self
117+
}
118+
119+
pub(crate) fn from_batches<'a, B, I>(batches: I) -> Option<Self>
120+
where
121+
B: BatchReader<Key = K>,
122+
I: IntoIterator<Item = &'a B>,
123+
{
124+
let batches: Vec<&'a B> = batches.into_iter().collect();
125+
let mut bounds: Option<(Box<K>, Box<K>)> = None;
126+
for batch in &batches {
127+
let (batch_min, batch_max) = batch.key_bounds()?;
128+
match bounds.as_mut() {
129+
Some((min, max)) => {
130+
if batch_min < min.as_ref() {
131+
*min = clone_box(batch_min);
132+
}
133+
if batch_max > max.as_ref() {
134+
*max = clone_box(batch_max);
135+
}
136+
}
137+
None => bounds = Some((clone_box(batch_min), clone_box(batch_max))),
138+
}
139+
}
140+
141+
bounds.map(|(min, max)| {
142+
let mut plan = Self {
143+
min,
144+
max,
145+
sampled_keys: None,
146+
};
147+
if plan.roaring_range_fits() {
148+
plan.sampled_keys = Self::collect_sampled_keys_from_batches(&batches);
149+
}
150+
plan
151+
})
152+
}
153+
154+
fn collect_sampled_keys_from_batches<B>(batches: &[&B]) -> Option<Box<DynVec<K>>>
155+
where
156+
B: BatchReader<Key = K>,
157+
{
158+
let first_batch = batches.first()?;
159+
let mut sampled_keys = first_batch.factories().keys_factory().default_box();
160+
let total_sample_size = batches
161+
.iter()
162+
.map(|batch| Self::sample_count_for_filter_plan(batch.key_count()))
163+
.sum::<usize>();
164+
sampled_keys.reserve(total_sample_size);
165+
166+
let mut rng = thread_rng();
167+
sample_keys_from_batches(
168+
&first_batch.factories(),
169+
batches,
170+
&mut rng,
171+
|batch| Self::sample_count_for_filter_plan(batch.key_count()),
172+
sampled_keys.as_mut(),
173+
);
174+
175+
(!sampled_keys.is_empty()).then_some(sampled_keys)
176+
}
177+
178+
fn roaring_range_fits(&self) -> bool {
179+
self.min.supports_roaring32() && self.max.into_roaring_u32(self.min.as_data()).is_some()
180+
}
181+
182+
fn can_use_roaring(&self, enable_roaring: bool) -> bool {
183+
enable_roaring && self.roaring_range_fits()
184+
}
185+
186+
fn predict_lookup_prefers_roaring(&self, estimated_keys: usize) -> bool {
187+
let sampled_keys = match self.sampled_keys.as_ref() {
188+
Some(sampled_keys) => sampled_keys,
189+
None => return false,
190+
};
191+
192+
let mut roaring_keys = Vec::with_capacity(sampled_keys.len());
193+
for index in 0..sampled_keys.len() {
194+
let roaring_key = match sampled_keys
195+
.index(index)
196+
.into_roaring_u32(self.min.as_data())
197+
{
198+
Some(roaring_key) => roaring_key,
199+
None => return false,
200+
};
201+
roaring_keys.push(roaring_key);
202+
}
203+
roaring_keys.sort_unstable();
204+
roaring_keys.dedup();
205+
206+
RoaringLookupSampleStats::from_sample(estimated_keys, &roaring_keys)
207+
.map(|stats| stats.lookup_prefers_roaring())
208+
.unwrap_or(false)
209+
}
210+
211+
fn preferred_filter(
212+
&self,
213+
estimated_keys: usize,
214+
enable_roaring: bool,
215+
bloom_false_positive_rate: f64,
216+
) -> BatchKeyFilter {
217+
if self.can_use_roaring(enable_roaring)
218+
&& self.predict_lookup_prefers_roaring(estimated_keys)
219+
{
220+
BatchKeyFilter::new_roaring_u32(self.min.as_ref())
221+
} else {
222+
BatchKeyFilter::new_bloom(estimated_keys, bloom_false_positive_rate)
223+
}
224+
}
225+
226+
/// Chooses the membership filter to build for a batch with `estimated_keys`
227+
/// rows, using the enabled Bloom/roaring settings and an optional batch
228+
/// bounds plan.
229+
pub fn decide_filter(
230+
filter_plan: Option<&Self>,
231+
estimated_keys: usize,
232+
) -> Option<BatchKeyFilter> {
233+
// Choose between Bloom, roaring, or no membership filter using the
234+
// following rules:
235+
//
236+
// - If Bloom and roaring are both enabled, prefer roaring when the
237+
// plan proves the batch range fits in `u32` and the sampled-key
238+
// lookup predictor says roaring should beat Bloom. If sampling is
239+
// unavailable or the predictor cannot run, fall back to Bloom.
240+
// - If only Bloom is enabled, always build Bloom.
241+
// - If only roaring is enabled, build roaring only when the plan
242+
// proves the batch range fits in `u32`; otherwise build no
243+
// membership filter.
244+
// - If both are disabled, build no membership filter.
245+
//
246+
// The "no plan => no roaring" rule is intentional: without known
247+
// batch bounds we cannot safely decide that min-offset roaring
248+
// encoding will fit, and we do not allow switching filters after
249+
// writing has started.
250+
let enable_roaring = Runtime::with_dev_tweaks(|dev_tweaks| dev_tweaks.enable_roaring());
251+
let bloom_false_positive_rate = Runtime::with_dev_tweaks(|dev_tweaks| {
252+
let rate = dev_tweaks.bloom_false_positive_rate();
253+
(rate > 0.0 && rate < 1.0).then_some(rate)
254+
});
255+
match (bloom_false_positive_rate, filter_plan) {
256+
(Some(rate), Some(filter_plan)) => {
257+
Some(filter_plan.preferred_filter(estimated_keys, enable_roaring, rate))
258+
}
259+
(Some(rate), None) => Some(BatchKeyFilter::new_bloom(estimated_keys, rate)),
260+
(None, Some(filter_plan)) if filter_plan.can_use_roaring(enable_roaring) => {
261+
Some(BatchKeyFilter::new_roaring_u32(filter_plan.min.as_ref()))
262+
}
263+
(None, _) => None,
264+
}
265+
}
266+
}

0 commit comments

Comments
 (0)