@@ -7,7 +7,6 @@ use super::{AnyFactories, Deserializer, Factories};
77use crate :: dynamic:: { DynVec , WeightTrait } ;
88use crate :: storage:: buffer_cache:: CacheAccess ;
99use crate :: storage:: file:: format:: { BatchMetadata , FilterBlock } ;
10- use crate :: storage:: filter_stats:: FilterStats ;
1110use crate :: storage:: tracking_bloom_filter:: TrackingBloomFilter ;
1211use crate :: storage:: {
1312 backend:: StorageError ,
@@ -1547,7 +1546,6 @@ where
15471546#[ derive( Debug ) ]
15481547pub struct Reader < T > {
15491548 file : ImmutableFileRef ,
1550- bloom_filter : Option < TrackingBloomFilter > ,
15511549 columns : Vec < Column > ,
15521550
15531551 /// Additional metadata added to the file by the writer.
@@ -1564,7 +1562,6 @@ where
15641562{
15651563 fn size_of_children ( & self , context : & mut size_of:: Context ) {
15661564 self . file . size_of_with_context ( context) ;
1567- context. add ( self . membership_filter_stats ( ) . size_byte ) ;
15681565 self . columns . size_of_with_context ( context) ;
15691566 }
15701567}
@@ -1578,8 +1575,17 @@ where
15781575 factories : & [ & AnyFactories ] ,
15791576 cache : fn ( ) -> Arc < BufferCache > ,
15801577 file : Arc < dyn FileReader > ,
1581- bloom_filter : Option < TrackingBloomFilter > ,
15821578 ) -> Result < Self , Error > {
1579+ let ( reader, _membership_filter) = Self :: new_with_filter ( factories, cache, file, None ) ?;
1580+ Ok ( reader)
1581+ }
1582+
1583+ pub ( crate ) fn new_with_filter (
1584+ factories : & [ & AnyFactories ] ,
1585+ cache : fn ( ) -> Arc < BufferCache > ,
1586+ file : Arc < dyn FileReader > ,
1587+ membership_filter : Option < TrackingBloomFilter > ,
1588+ ) -> Result < ( Self , Option < TrackingBloomFilter > ) , Error > {
15831589 let file_size = file. get_size ( ) ?;
15841590 if file_size < 512 || ( file_size % 512 ) != 0 {
15851591 return Err ( CorruptionError :: InvalidFileSize ( file_size) . into ( ) ) ;
@@ -1660,34 +1666,39 @@ where
16601666 ) ?
16611667 . into ( ) )
16621668 }
1663- let bloom_filter = match bloom_filter {
1664- Some ( bloom_filter) => Some ( bloom_filter) ,
1665- None if file_trailer. has_filter64 ( ) => Some ( read_filter_block (
1669+ let membership_filter = if let Some ( membership_filter) = membership_filter {
1670+ Some ( membership_filter)
1671+ } else if file_trailer. has_filter64 ( ) {
1672+ Some ( read_filter_block (
16661673 & * file,
16671674 file_trailer. filter_offset64 ,
16681675 file_trailer. filter_size64 as usize ,
1669- ) ?) ,
1670- None if file_trailer. filter_offset != 0 => Some ( read_filter_block (
1676+ ) ?)
1677+ } else if file_trailer. filter_offset != 0 {
1678+ Some ( read_filter_block (
16711679 & * file,
16721680 file_trailer. filter_offset ,
16731681 file_trailer. filter_size as usize ,
1674- ) ?) ,
1675- None => None ,
1682+ ) ?)
1683+ } else {
1684+ None
16761685 } ;
16771686
1678- Ok ( Self {
1679- file : ImmutableFileRef :: new (
1680- cache,
1681- file,
1682- file_trailer. compression ,
1683- stats,
1684- file_trailer. version ,
1685- ) ,
1686- columns,
1687- bloom_filter,
1688- metadata : file_trailer. metadata . clone ( ) ,
1689- _phantom : PhantomData ,
1690- } )
1687+ Ok ( (
1688+ Self {
1689+ file : ImmutableFileRef :: new (
1690+ cache,
1691+ file,
1692+ file_trailer. compression ,
1693+ stats,
1694+ file_trailer. version ,
1695+ ) ,
1696+ columns,
1697+ metadata : file_trailer. metadata . clone ( ) ,
1698+ _phantom : PhantomData ,
1699+ } ,
1700+ membership_filter,
1701+ ) )
16911702 }
16921703
16931704 /// Marks the file of the reader as being part of a checkpoint.
@@ -1702,7 +1713,16 @@ where
17021713 storage_backend : & dyn StorageBackend ,
17031714 path : & StoragePath ,
17041715 ) -> Result < Self , Error > {
1705- Self :: new ( factories, cache, storage_backend. open ( path) ?, None )
1716+ Self :: new ( factories, cache, storage_backend. open ( path) ?)
1717+ }
1718+
1719+ pub ( crate ) fn open_with_filter (
1720+ factories : & [ & AnyFactories ] ,
1721+ cache : fn ( ) -> Arc < BufferCache > ,
1722+ storage_backend : & dyn StorageBackend ,
1723+ path : & StoragePath ,
1724+ ) -> Result < ( Self , Option < TrackingBloomFilter > ) , Error > {
1725+ Self :: new_with_filter ( factories, cache, storage_backend. open ( path) ?, None )
17061726 }
17071727
17081728 /// The number of columns in the layer file.
@@ -1732,18 +1752,6 @@ where
17321752 Ok ( self . file . file_handle . get_size ( ) ?)
17331753 }
17341754
1735- /// Returns statistics of the membership filter, including its size in
1736- /// bytes.
1737- ///
1738- /// If the file doesn't have a Bloom filter, returns a default of zeros.
1739- pub fn membership_filter_stats ( & self ) -> FilterStats {
1740- if let Some ( bloom_filter) = & self . bloom_filter {
1741- bloom_filter. stats ( )
1742- } else {
1743- FilterStats :: default ( )
1744- }
1745- }
1746-
17471755 /// Evict this file from the cache.
17481756 #[ cfg( test) ]
17491757 pub fn evict ( & self ) {
@@ -1786,13 +1794,6 @@ where
17861794 Ok ( Some ( root. key_range ( & self . file , & factories) ?) )
17871795 }
17881796
1789- /// Asks the bloom filter of the reader if we have the key.
1790- pub fn maybe_contains_key ( & self , hash : u64 ) -> bool {
1791- self . bloom_filter
1792- . as_ref ( )
1793- . is_none_or ( |b| b. contains_hash ( hash) )
1794- }
1795-
17961797 /// Returns a [`RowGroup`] for all of the rows in column 0.
17971798 pub fn rows ( & self ) -> RowGroup < ' _ , K , A , N , ( & ' static K , & ' static A , N ) > {
17981799 RowGroup :: new ( self , 0 , 0 ..self . columns [ 0 ] . n_rows )
@@ -1815,6 +1816,13 @@ where
18151816 pub fn fetch_zset < ' a , ' b > (
18161817 & ' a self ,
18171818 keys : & ' b DynVec < K > ,
1819+ ) -> Result < FetchZSet < ' a , ' b , K , A > , Error > {
1820+ self . fetch_zset_filtered ( FilteredKeys :: all ( keys) )
1821+ }
1822+
1823+ pub ( crate ) fn fetch_zset_filtered < ' a , ' b > (
1824+ & ' a self ,
1825+ keys : FilteredKeys < ' b , K > ,
18181826 ) -> Result < FetchZSet < ' a , ' b , K , A > , Error > {
18191827 FetchZSet :: new ( self , keys)
18201828 }
@@ -1833,11 +1841,74 @@ where
18331841 pub fn fetch_indexed_zset < ' a , ' b > (
18341842 & ' a self ,
18351843 keys : & ' b DynVec < K0 > ,
1844+ ) -> Result < FetchIndexedZSet < ' a , ' b , K0 , A0 , K1 , A1 > , Error > {
1845+ self . fetch_indexed_zset_filtered ( FilteredKeys :: all ( keys) )
1846+ }
1847+
1848+ pub ( crate ) fn fetch_indexed_zset_filtered < ' a , ' b > (
1849+ & ' a self ,
1850+ keys : FilteredKeys < ' b , K0 > ,
18361851 ) -> Result < FetchIndexedZSet < ' a , ' b , K0 , A0 , K1 , A1 > , Error > {
18371852 FetchIndexedZSet :: new ( self , keys)
18381853 }
18391854}
18401855
1856+ /// A `DynVec`, possibly filtered by a higher-level exact-seek filter chain.
1857+ pub ( crate ) struct FilteredKeys < ' a , K >
1858+ where
1859+ K : DataTrait + ?Sized ,
1860+ {
1861+ queried_keys : & ' a DynVec < K > ,
1862+ filter_pass_keys : Option < Vec < usize > > ,
1863+ }
1864+
1865+ impl < ' a , K > FilteredKeys < ' a , K >
1866+ where
1867+ K : DataTrait + ?Sized ,
1868+ {
1869+ pub ( crate ) fn all ( queried_keys : & ' a DynVec < K > ) -> Self {
1870+ Self {
1871+ queried_keys,
1872+ filter_pass_keys : None ,
1873+ }
1874+ }
1875+
1876+ pub ( crate ) fn with_filter_pass_keys (
1877+ queried_keys : & ' a DynVec < K > ,
1878+ filter_pass_keys : Option < Vec < usize > > ,
1879+ ) -> Self {
1880+ Self {
1881+ queried_keys,
1882+ filter_pass_keys,
1883+ }
1884+ }
1885+
1886+ pub ( crate ) fn len ( & self ) -> usize {
1887+ match & self . filter_pass_keys {
1888+ Some ( filter_pass_keys) => filter_pass_keys. len ( ) ,
1889+ None => self . queried_keys . len ( ) ,
1890+ }
1891+ }
1892+
1893+ pub ( crate ) fn is_empty ( & self ) -> bool {
1894+ self . len ( ) == 0
1895+ }
1896+ }
1897+
1898+ impl < K > Index < usize > for FilteredKeys < ' _ , K >
1899+ where
1900+ K : DataTrait + ?Sized ,
1901+ {
1902+ type Output = K ;
1903+
1904+ fn index ( & self , index : usize ) -> & Self :: Output {
1905+ match & self . filter_pass_keys {
1906+ Some ( filter_pass_keys) => & self . queried_keys [ filter_pass_keys[ index] ] ,
1907+ None => & self . queried_keys [ index] ,
1908+ }
1909+ }
1910+ }
1911+
18411912/// A sorted, indexed group of unique rows in a [`Reader`].
18421913///
18431914/// Column 0 in a layer file has a single [`RowGroup`] that includes all of the
@@ -3114,80 +3185,3 @@ where
31143185 }
31153186 }
31163187}
3117-
3118- /// A `DynVec`, possibly filtered by a Bloom filter.
3119- struct FilteredKeys < ' b , K >
3120- where
3121- K : ?Sized ,
3122- {
3123- /// Sorted array to keys to retrieve.
3124- queried_keys : & ' b DynVec < K > ,
3125-
3126- /// Indexes into `queried_keys` of the keys that pass the Bloom filter. If
3127- /// this is `None`, then enough of the keys passed the Bloom filter that we
3128- /// just take all of them.
3129- bloom_keys : Option < Vec < usize > > ,
3130- }
3131-
3132- impl < ' b , K > FilteredKeys < ' b , K >
3133- where
3134- K : DataTrait + ?Sized ,
3135- {
3136- /// Returns `keys`, filtered using `reader.maybe_contains_key()`.
3137- fn new < ' a , A , N > ( reader : & ' a Reader < ( & ' static K , & ' static A , N ) > , keys : & ' b DynVec < K > ) -> Self
3138- where
3139- A : DataTrait + ?Sized ,
3140- N : ColumnSpec ,
3141- {
3142- debug_assert ! ( keys. is_sorted_by( & |a, b| a. cmp( b) ) ) ;
3143-
3144- // Pass keys into the Bloom filter until 1/300th of them pass the Bloom
3145- // filter. Empirically, this seems to good enough for the common case
3146- // where the data passed into a "distinct" operator is actually distinct
3147- // but we get some false positives from the Bloom filter. Because the
3148- // keys that go into a "distinct" operator are often large, we don't
3149- // want to pass all of them into the Bloom filter if we're going to have
3150- // to deserialize them anyhow later.
3151- let mut bloom_keys = SmallVec :: < [ _ ; 50 ] > :: new ( ) ;
3152- for ( index, key) in keys. dyn_iter ( ) . enumerate ( ) {
3153- if reader. maybe_contains_key ( key. default_hash ( ) ) {
3154- bloom_keys. push ( index) ;
3155- if bloom_keys. len ( ) >= keys. len ( ) / 300 {
3156- return Self {
3157- queried_keys : keys,
3158- bloom_keys : None ,
3159- } ;
3160- }
3161- }
3162- }
3163- Self {
3164- queried_keys : keys,
3165- bloom_keys : Some ( bloom_keys. into_vec ( ) ) ,
3166- }
3167- }
3168-
3169- fn len ( & self ) -> usize {
3170- match & self . bloom_keys {
3171- Some ( bloom_keys) => bloom_keys. len ( ) ,
3172- None => self . queried_keys . len ( ) ,
3173- }
3174- }
3175-
3176- fn is_empty ( & self ) -> bool {
3177- self . len ( ) == 0
3178- }
3179- }
3180-
3181- impl < ' b , K > Index < usize > for FilteredKeys < ' b , K >
3182- where
3183- K : DataTrait + ?Sized ,
3184- {
3185- type Output = K ;
3186-
3187- fn index ( & self , index : usize ) -> & Self :: Output {
3188- match & self . bloom_keys {
3189- Some ( bloom_keys) => & self . queried_keys [ bloom_keys[ index] ] ,
3190- None => & self . queried_keys [ index] ,
3191- }
3192- }
3193- }
0 commit comments