Skip to content

Commit 1ca1c6e

Browse files
committed
dbsp: Filter refactoring
- Remove legacy maybe_contains_key - Reader no longer "owns" bloom filter directly - Filtering is handled by BatchFilters (which in the future can support multiple different filter impls) This is supposed to make it easier to customize filters in a follow up PR. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 228f64d commit 1ca1c6e

File tree

18 files changed

+326
-312
lines changed

18 files changed

+326
-312
lines changed

crates/dbsp/src/storage/file/reader.rs

Lines changed: 115 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use super::{AnyFactories, Deserializer, Factories};
77
use crate::dynamic::{DynVec, WeightTrait};
88
use crate::storage::buffer_cache::CacheAccess;
99
use crate::storage::file::format::{BatchMetadata, FilterBlock};
10-
use crate::storage::filter_stats::FilterStats;
1110
use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
1211
use crate::storage::{
1312
backend::StorageError,
@@ -1547,7 +1546,6 @@ where
15471546
#[derive(Debug)]
15481547
pub struct Reader<T> {
15491548
file: ImmutableFileRef,
1550-
bloom_filter: Option<TrackingBloomFilter>,
15511549
columns: Vec<Column>,
15521550

15531551
/// Additional metadata added to the file by the writer.
@@ -1564,7 +1562,6 @@ where
15641562
{
15651563
fn size_of_children(&self, context: &mut size_of::Context) {
15661564
self.file.size_of_with_context(context);
1567-
context.add(self.membership_filter_stats().size_byte);
15681565
self.columns.size_of_with_context(context);
15691566
}
15701567
}
@@ -1578,8 +1575,17 @@ where
15781575
factories: &[&AnyFactories],
15791576
cache: fn() -> Arc<BufferCache>,
15801577
file: Arc<dyn FileReader>,
1581-
bloom_filter: Option<TrackingBloomFilter>,
15821578
) -> Result<Self, Error> {
1579+
let (reader, _membership_filter) = Self::new_with_filter(factories, cache, file, None)?;
1580+
Ok(reader)
1581+
}
1582+
1583+
pub(crate) fn new_with_filter(
1584+
factories: &[&AnyFactories],
1585+
cache: fn() -> Arc<BufferCache>,
1586+
file: Arc<dyn FileReader>,
1587+
membership_filter: Option<TrackingBloomFilter>,
1588+
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
15831589
let file_size = file.get_size()?;
15841590
if file_size < 512 || (file_size % 512) != 0 {
15851591
return Err(CorruptionError::InvalidFileSize(file_size).into());
@@ -1660,34 +1666,39 @@ where
16601666
)?
16611667
.into())
16621668
}
1663-
let bloom_filter = match bloom_filter {
1664-
Some(bloom_filter) => Some(bloom_filter),
1665-
None if file_trailer.has_filter64() => Some(read_filter_block(
1669+
let membership_filter = if let Some(membership_filter) = membership_filter {
1670+
Some(membership_filter)
1671+
} else if file_trailer.has_filter64() {
1672+
Some(read_filter_block(
16661673
&*file,
16671674
file_trailer.filter_offset64,
16681675
file_trailer.filter_size64 as usize,
1669-
)?),
1670-
None if file_trailer.filter_offset != 0 => Some(read_filter_block(
1676+
)?)
1677+
} else if file_trailer.filter_offset != 0 {
1678+
Some(read_filter_block(
16711679
&*file,
16721680
file_trailer.filter_offset,
16731681
file_trailer.filter_size as usize,
1674-
)?),
1675-
None => None,
1682+
)?)
1683+
} else {
1684+
None
16761685
};
16771686

1678-
Ok(Self {
1679-
file: ImmutableFileRef::new(
1680-
cache,
1681-
file,
1682-
file_trailer.compression,
1683-
stats,
1684-
file_trailer.version,
1685-
),
1686-
columns,
1687-
bloom_filter,
1688-
metadata: file_trailer.metadata.clone(),
1689-
_phantom: PhantomData,
1690-
})
1687+
Ok((
1688+
Self {
1689+
file: ImmutableFileRef::new(
1690+
cache,
1691+
file,
1692+
file_trailer.compression,
1693+
stats,
1694+
file_trailer.version,
1695+
),
1696+
columns,
1697+
metadata: file_trailer.metadata.clone(),
1698+
_phantom: PhantomData,
1699+
},
1700+
membership_filter,
1701+
))
16911702
}
16921703

16931704
/// Marks the file of the reader as being part of a checkpoint.
@@ -1702,7 +1713,16 @@ where
17021713
storage_backend: &dyn StorageBackend,
17031714
path: &StoragePath,
17041715
) -> Result<Self, Error> {
1705-
Self::new(factories, cache, storage_backend.open(path)?, None)
1716+
Self::new(factories, cache, storage_backend.open(path)?)
1717+
}
1718+
1719+
pub(crate) fn open_with_filter(
1720+
factories: &[&AnyFactories],
1721+
cache: fn() -> Arc<BufferCache>,
1722+
storage_backend: &dyn StorageBackend,
1723+
path: &StoragePath,
1724+
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
1725+
Self::new_with_filter(factories, cache, storage_backend.open(path)?, None)
17061726
}
17071727

17081728
/// The number of columns in the layer file.
@@ -1732,18 +1752,6 @@ where
17321752
Ok(self.file.file_handle.get_size()?)
17331753
}
17341754

1735-
/// Returns statistics of the membership filter, including its size in
1736-
/// bytes.
1737-
///
1738-
/// If the file doesn't have a Bloom filter, returns a default of zeros.
1739-
pub fn membership_filter_stats(&self) -> FilterStats {
1740-
if let Some(bloom_filter) = &self.bloom_filter {
1741-
bloom_filter.stats()
1742-
} else {
1743-
FilterStats::default()
1744-
}
1745-
}
1746-
17471755
/// Evict this file from the cache.
17481756
#[cfg(test)]
17491757
pub fn evict(&self) {
@@ -1786,13 +1794,6 @@ where
17861794
Ok(Some(root.key_range(&self.file, &factories)?))
17871795
}
17881796

1789-
/// Asks the bloom filter of the reader if we have the key.
1790-
pub fn maybe_contains_key(&self, hash: u64) -> bool {
1791-
self.bloom_filter
1792-
.as_ref()
1793-
.is_none_or(|b| b.contains_hash(hash))
1794-
}
1795-
17961797
/// Returns a [`RowGroup`] for all of the rows in column 0.
17971798
pub fn rows(&self) -> RowGroup<'_, K, A, N, (&'static K, &'static A, N)> {
17981799
RowGroup::new(self, 0, 0..self.columns[0].n_rows)
@@ -1815,6 +1816,13 @@ where
18151816
pub fn fetch_zset<'a, 'b>(
18161817
&'a self,
18171818
keys: &'b DynVec<K>,
1819+
) -> Result<FetchZSet<'a, 'b, K, A>, Error> {
1820+
self.fetch_zset_filtered(FilteredKeys::all(keys))
1821+
}
1822+
1823+
pub(crate) fn fetch_zset_filtered<'a, 'b>(
1824+
&'a self,
1825+
keys: FilteredKeys<'b, K>,
18181826
) -> Result<FetchZSet<'a, 'b, K, A>, Error> {
18191827
FetchZSet::new(self, keys)
18201828
}
@@ -1833,11 +1841,74 @@ where
18331841
pub fn fetch_indexed_zset<'a, 'b>(
18341842
&'a self,
18351843
keys: &'b DynVec<K0>,
1844+
) -> Result<FetchIndexedZSet<'a, 'b, K0, A0, K1, A1>, Error> {
1845+
self.fetch_indexed_zset_filtered(FilteredKeys::all(keys))
1846+
}
1847+
1848+
pub(crate) fn fetch_indexed_zset_filtered<'a, 'b>(
1849+
&'a self,
1850+
keys: FilteredKeys<'b, K0>,
18361851
) -> Result<FetchIndexedZSet<'a, 'b, K0, A0, K1, A1>, Error> {
18371852
FetchIndexedZSet::new(self, keys)
18381853
}
18391854
}
18401855

1856+
/// A `DynVec`, possibly filtered by a higher-level exact-seek filter chain.
1857+
pub(crate) struct FilteredKeys<'a, K>
1858+
where
1859+
K: DataTrait + ?Sized,
1860+
{
1861+
queried_keys: &'a DynVec<K>,
1862+
filter_pass_keys: Option<Vec<usize>>,
1863+
}
1864+
1865+
impl<'a, K> FilteredKeys<'a, K>
1866+
where
1867+
K: DataTrait + ?Sized,
1868+
{
1869+
pub(crate) fn all(queried_keys: &'a DynVec<K>) -> Self {
1870+
Self {
1871+
queried_keys,
1872+
filter_pass_keys: None,
1873+
}
1874+
}
1875+
1876+
pub(crate) fn with_filter_pass_keys(
1877+
queried_keys: &'a DynVec<K>,
1878+
filter_pass_keys: Option<Vec<usize>>,
1879+
) -> Self {
1880+
Self {
1881+
queried_keys,
1882+
filter_pass_keys,
1883+
}
1884+
}
1885+
1886+
pub(crate) fn len(&self) -> usize {
1887+
match &self.filter_pass_keys {
1888+
Some(filter_pass_keys) => filter_pass_keys.len(),
1889+
None => self.queried_keys.len(),
1890+
}
1891+
}
1892+
1893+
pub(crate) fn is_empty(&self) -> bool {
1894+
self.len() == 0
1895+
}
1896+
}
1897+
1898+
impl<K> Index<usize> for FilteredKeys<'_, K>
1899+
where
1900+
K: DataTrait + ?Sized,
1901+
{
1902+
type Output = K;
1903+
1904+
fn index(&self, index: usize) -> &Self::Output {
1905+
match &self.filter_pass_keys {
1906+
Some(filter_pass_keys) => &self.queried_keys[filter_pass_keys[index]],
1907+
None => &self.queried_keys[index],
1908+
}
1909+
}
1910+
}
1911+
18411912
/// A sorted, indexed group of unique rows in a [`Reader`].
18421913
///
18431914
/// Column 0 in a layer file has a single [`RowGroup`] that includes all of the
@@ -3114,80 +3185,3 @@ where
31143185
}
31153186
}
31163187
}
3117-
3118-
/// A `DynVec`, possibly filtered by a Bloom filter.
3119-
struct FilteredKeys<'b, K>
3120-
where
3121-
K: ?Sized,
3122-
{
3123-
/// Sorted array to keys to retrieve.
3124-
queried_keys: &'b DynVec<K>,
3125-
3126-
/// Indexes into `queried_keys` of the keys that pass the Bloom filter. If
3127-
/// this is `None`, then enough of the keys passed the Bloom filter that we
3128-
/// just take all of them.
3129-
bloom_keys: Option<Vec<usize>>,
3130-
}
3131-
3132-
impl<'b, K> FilteredKeys<'b, K>
3133-
where
3134-
K: DataTrait + ?Sized,
3135-
{
3136-
/// Returns `keys`, filtered using `reader.maybe_contains_key()`.
3137-
fn new<'a, A, N>(reader: &'a Reader<(&'static K, &'static A, N)>, keys: &'b DynVec<K>) -> Self
3138-
where
3139-
A: DataTrait + ?Sized,
3140-
N: ColumnSpec,
3141-
{
3142-
debug_assert!(keys.is_sorted_by(&|a, b| a.cmp(b)));
3143-
3144-
// Pass keys into the Bloom filter until 1/300th of them pass the Bloom
3145-
// filter. Empirically, this seems to good enough for the common case
3146-
// where the data passed into a "distinct" operator is actually distinct
3147-
// but we get some false positives from the Bloom filter. Because the
3148-
// keys that go into a "distinct" operator are often large, we don't
3149-
// want to pass all of them into the Bloom filter if we're going to have
3150-
// to deserialize them anyhow later.
3151-
let mut bloom_keys = SmallVec::<[_; 50]>::new();
3152-
for (index, key) in keys.dyn_iter().enumerate() {
3153-
if reader.maybe_contains_key(key.default_hash()) {
3154-
bloom_keys.push(index);
3155-
if bloom_keys.len() >= keys.len() / 300 {
3156-
return Self {
3157-
queried_keys: keys,
3158-
bloom_keys: None,
3159-
};
3160-
}
3161-
}
3162-
}
3163-
Self {
3164-
queried_keys: keys,
3165-
bloom_keys: Some(bloom_keys.into_vec()),
3166-
}
3167-
}
3168-
3169-
fn len(&self) -> usize {
3170-
match &self.bloom_keys {
3171-
Some(bloom_keys) => bloom_keys.len(),
3172-
None => self.queried_keys.len(),
3173-
}
3174-
}
3175-
3176-
fn is_empty(&self) -> bool {
3177-
self.len() == 0
3178-
}
3179-
}
3180-
3181-
impl<'b, K> Index<usize> for FilteredKeys<'b, K>
3182-
where
3183-
K: DataTrait + ?Sized,
3184-
{
3185-
type Output = K;
3186-
3187-
fn index(&self, index: usize) -> &Self::Output {
3188-
match &self.bloom_keys {
3189-
Some(bloom_keys) => &self.queried_keys[bloom_keys[index]],
3190-
None => &self.queried_keys[index],
3191-
}
3192-
}
3193-
}

crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ where
5050
{
5151
pub(super) fn new(
5252
reader: &'a Reader<(&'static K0, &'static A0, (&'static K1, &'static A1, ()))>,
53-
keys: &'b DynVec<K0>,
53+
keys: FilteredKeys<'b, K0>,
5454
) -> Result<Self, Error> {
5555
Ok(Self(FetchIndexedZSetInner::Column0(Some(Fetch0::new(
5656
reader, keys,
@@ -172,7 +172,7 @@ where
172172
{
173173
fn new(
174174
reader: &'a Reader<(&'static K, &'static A, N)>,
175-
keys: &'b DynVec<K>,
175+
keys: FilteredKeys<'b, K>,
176176
) -> Result<Self, Error> {
177177
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
178178
let factories = reader.columns[0].factories.factories();
@@ -185,7 +185,7 @@ where
185185
key_stack.reserve_exact(10);
186186

187187
let mut this = Self {
188-
keys: FilteredKeys::new(reader, keys),
188+
keys,
189189
reader,
190190
cache: (reader.file.cache)(),
191191
factories,

crates/dbsp/src/storage/file/reader/fetch_zset.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::Factories;
2-
use crate::dynamic::{DataTrait, DynVec, WeightTrait};
2+
use crate::dynamic::{DataTrait, WeightTrait};
33
use crate::storage::file::reader::{
44
DataBlock, Error, FilteredKeys, Reader, TreeBlock, TreeNode, decompress,
55
};
@@ -60,13 +60,13 @@ where
6060
{
6161
pub(super) fn new(
6262
reader: &'a Reader<(&'static K, &'static A, ())>,
63-
keys: &'b DynVec<K>,
63+
keys: FilteredKeys<'b, K>,
6464
) -> Result<Self, Error> {
6565
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
6666
let factories = reader.columns[0].factories.factories();
6767
let tmp_key = factories.key_factory.default_box();
6868
let mut this = Self {
69-
keys: FilteredKeys::new(reader, keys),
69+
keys,
7070
reader,
7171
cache: (reader.file.cache)(),
7272
factories,

0 commit comments

Comments
 (0)