Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
When navigating the codebase, look at relevant README.md for more project context.
- To gather more context beyond README.md:
- Look at the outstanding changes in the tree
- If on a branch check the last 2-3 to commits
- Look at relevant README.md in sub-folders

- If on a branch maybe check the last 2-3 to add commits for more context
- Look at the outstanding changes in the tree
- Write production quality code
- Make sure the code compiles
- Write production quality code.
- Adhere to rules in "Code Complete" by Steve McConnell.
- Adhere to rules in "The Art of Readable Code" by Dustin Boswell & Trevor Foucher.
- Make sure the code compiles.
- When adding code always ensure that tests cover the newly added code:
- Unit tests that validate for regular and exceptional inputs
- Use property based testing/model based testing/fuzzing when appropriate
- Integration tests for big platform-level features (in @python/tests)

At the start of every conversation, offer the user to run `scripts/claude.sh` to pull in shared LLM context files as unstaged changes. These should not be committed outside the `claude-context` branch.
- Add/update documenation and comments.
- Adhere to rules in "Bugs in Writing: A Guide to Debugging Your Prose" by Lyn Dupre.
- Adhere to rules in "The Elements of Style, Fourth Edition" by William Strunk Jr. & E. B. White

At the start of every conversation, offer the user to run `scripts/claude.sh` to pull in shared LLM context files as unstaged changes.
These should not be committed outside the `claude-context` branch.
2 changes: 1 addition & 1 deletion crates/dbsp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ futures = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
paste = { workspace = true }
seq-macro = { workspace = true }
derive_more = { workspace = true, features = ["add", "not", "from", "debug"] }
derive_more = { workspace = true, features = ["add", "add_assign", "sum", "not", "from", "debug"] }
dyn-clone = { workspace = true }
rand_chacha = { workspace = true }
tempfile = { workspace = true }
Expand Down
32 changes: 31 additions & 1 deletion crates/dbsp/src/circuit/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ pub const BLOOM_FILTER_MISSES_COUNT: MetricId =
pub const BLOOM_FILTER_HIT_RATE_PERCENT: MetricId =
MetricId(Cow::Borrowed("bloom_filter_hit_rate_percent"));
pub const BLOOM_FILTER_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("bloom_filter_size_bytes"));
pub const RANGE_FILTER_HITS_COUNT: MetricId = MetricId(Cow::Borrowed("range_filter_hits_count"));
pub const RANGE_FILTER_MISSES_COUNT: MetricId =
MetricId(Cow::Borrowed("range_filter_misses_count"));
pub const RANGE_FILTER_HIT_RATE_PERCENT: MetricId =
MetricId(Cow::Borrowed("range_filter_hit_rate_percent"));
pub const RANGE_FILTER_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("range_filter_size_bytes"));
pub const SPINE_BATCHES_COUNT: MetricId = MetricId(Cow::Borrowed("spine_batches_count"));
pub const SPINE_STORAGE_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("spine_storage_size_bytes"));
pub const MERGING_SIZE_BYTES: MetricId = MetricId(Cow::Borrowed("merging_size_bytes"));
Expand All @@ -161,7 +167,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
MetricId(Cow::Borrowed("input_integral_records_count"));

pub const CIRCUIT_METRICS: [CircuitMetric; 66] = [
pub const CIRCUIT_METRICS: [CircuitMetric; 70] = [
// State
CircuitMetric {
name: USED_MEMORY_BYTES,
Expand Down Expand Up @@ -289,6 +295,30 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 66] = [
advanced: false,
description: "Hit rate of the Bloom filter.",
},
CircuitMetric {
name: RANGE_FILTER_SIZE_BYTES,
category: CircuitMetricCategory::State,
advanced: true,
description: "Size of the cached range filter in bytes.",
},
CircuitMetric {
name: RANGE_FILTER_HITS_COUNT,
category: CircuitMetricCategory::State,
advanced: false,
description: "The number of hits across all range filters. The hits are summed across the range filters for all batches in the spine.",
},
CircuitMetric {
name: RANGE_FILTER_MISSES_COUNT,
category: CircuitMetricCategory::State,
advanced: false,
description: "The number of misses across all range filters. The misses are summed across the range filters for all batches in the spine.",
},
CircuitMetric {
name: RANGE_FILTER_HIT_RATE_PERCENT,
category: CircuitMetricCategory::State,
advanced: false,
description: "Hit rate of the range filter.",
},
CircuitMetric {
name: RETAINMENT_BOUNDS,
category: CircuitMetricCategory::State,
Expand Down
1 change: 1 addition & 0 deletions crates/dbsp/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod backend;
pub mod buffer_cache;
pub mod dirlock;
pub mod file;
pub mod filter_stats;
pub mod tracking_bloom_filter;

use fdlimit::{Outcome::LimitRaised, raise_fd_limit};
Expand Down
75 changes: 69 additions & 6 deletions crates/dbsp/src/storage/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use super::{AnyFactories, Deserializer, Factories};
use crate::dynamic::{DynVec, WeightTrait};
use crate::storage::buffer_cache::CacheAccess;
use crate::storage::file::format::{BatchMetadata, FilterBlock};
use crate::storage::tracking_bloom_filter::{BloomFilterStats, TrackingBloomFilter};
use crate::storage::filter_stats::FilterStats;
use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
use crate::storage::{
backend::StorageError,
buffer_cache::{BufferCache, FBuf},
Expand Down Expand Up @@ -655,6 +656,12 @@ where
DeserializeDyn::deserialize_with(item.fst(), key, &mut deserializer)
}
}
unsafe fn key_range(&self, factories: &Factories<K, A>, min: &mut K, max: &mut K) {
unsafe {
self.key(factories, 0, min);
self.key(factories, self.n_values() - 1, max);
}
}
unsafe fn aux(&self, factories: &Factories<K, A>, index: usize, aux: &mut A) {
unsafe {
let item = self.archived_item(factories, index);
Expand Down Expand Up @@ -807,6 +814,33 @@ impl TreeNode {
NodeType::Index => Ok(TreeBlock::Index(IndexBlock::new(file, self)?)),
}
}

fn key_range<K, A>(
&self,
file: &ImmutableFileRef,
factories: &Factories<K, A>,
) -> Result<(Box<K>, Box<K>), Error>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
let key_factory = factories.key_factory;
let mut min = key_factory.default_box();
let mut max = key_factory.default_box();

match self.read::<K, A>(file)? {
// SAFETY: Unsafe because of serialization
TreeBlock::Data(data_block) => unsafe {
data_block.key_range(factories, min.as_mut(), max.as_mut());
},
// SAFETY: Unsafe because of serialization
TreeBlock::Index(index_block) => unsafe {
index_block.key_range(min.as_mut(), max.as_mut());
},
}

Ok((min, max))
}
}

enum TreeBlock<K: DataTrait + ?Sized, A: DataTrait + ?Sized> {
Expand Down Expand Up @@ -1112,6 +1146,17 @@ where
}
}

unsafe fn key_range(&self, min: &mut K, max: &mut K) {
unsafe {
self.get_bound(0, min);
self.max_bound(max);
}
}

unsafe fn max_bound(&self, bound: &mut K) {
unsafe { self.get_bound(self.last_bound_index(), bound) }
}

/// Returns the index of the child of this index block that contains a row
/// in the range `target_rows` and which may contain a key for which
/// `compare` returns `bias` or [Equal], or `None` if there is no such
Expand Down Expand Up @@ -1205,6 +1250,10 @@ where
self.child_offsets.count
}

fn last_bound_index(&self) -> usize {
self.n_children() * 2 - 1
}

/// Returns the comparison of the largest bound key using `compare`.
unsafe fn compare_max<C>(&self, key_factory: &dyn Factory<K>, compare: &C) -> Ordering
where
Expand All @@ -1213,7 +1262,7 @@ where
unsafe {
let mut ordering = Equal;
key_factory.with(&mut |key| {
self.get_bound(self.n_children() * 2 - 1, key);
self.max_bound(key);
ordering = compare(key);
});
ordering
Expand Down Expand Up @@ -1515,7 +1564,7 @@ where
{
fn size_of_children(&self, context: &mut size_of::Context) {
self.file.size_of_with_context(context);
context.add(self.filter_stats().size_byte);
context.add(self.membership_filter_stats().size_byte);
self.columns.size_of_with_context(context);
}
}
Expand Down Expand Up @@ -1683,14 +1732,15 @@ where
Ok(self.file.file_handle.get_size()?)
}

/// Returns statistics of the Bloom filter, including its size in bytes.
/// Returns statistics of the membership filter, including its size in
/// bytes.
///
/// If the file doesn't have a Bloom filter, returns a default of zeros.
pub fn filter_stats(&self) -> BloomFilterStats {
pub fn membership_filter_stats(&self) -> FilterStats {
if let Some(bloom_filter) = &self.bloom_filter {
bloom_filter.stats()
} else {
BloomFilterStats::default()
FilterStats::default()
}
}

Expand Down Expand Up @@ -1723,6 +1773,19 @@ where
A: DataTrait + ?Sized,
(&'static K, &'static A, N): ColumnSpec,
{
/// Returns the min and max keys stored in column 0.
///
/// The bounds are loaded from the root node when first requested and can
/// then be cached by higher-level batch types.
pub fn key_range(&self) -> Result<Option<(Box<K>, Box<K>)>, Error> {
Comment thread
gz marked this conversation as resolved.
let Some(root) = self.columns[0].root.as_ref() else {
return Ok(None);
};

let factories = self.columns[0].factories.factories::<K, A>();
Ok(Some(root.key_range(&self.file, &factories)?))
}

/// Asks the bloom filter of the reader if we have the key.
pub fn maybe_contains_key(&self, hash: u64) -> bool {
self.bloom_filter
Expand Down
Loading
Loading