Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[dbsp] Add circuit profile metadata.
Add circuit profile metadata for inlined values and omitted bounds.

Signed-off-by: Ben Pfaff <blp@feldera.com>
  • Loading branch information
blp committed Dec 24, 2025
commit 3ed076ddb0fbadad56029096bc9d4ba63fdb6181
12 changes: 11 additions & 1 deletion crates/dbsp/src/circuit/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl BatchSizeStats {
/// General metadata about an operator's execution
#[derive(Debug, Clone, PartialEq, Default, Serialize)]
pub struct OperatorMeta {
entries: Vec<(MetaLabel, MetaItem)>,
pub entries: Vec<(MetaLabel, MetaItem)>,
}

impl OperatorMeta {
Expand All @@ -145,6 +145,9 @@ impl OperatorMeta {
.map(|(_label, item)| item.clone())
}

/// Merges the mergeable entries in `other` into this operator metadata.
///
/// See [MetaItem::merge] to learn about merging metadata.
pub fn merge(&mut self, other: &Self) {
for (label, src) in &other.entries {
if src.is_mergeable() {
Expand Down Expand Up @@ -338,6 +341,7 @@ impl MetaItem {
}
}

/// Returns whether this kind of metadata item is mergeable.
pub fn is_mergeable(&self) -> bool {
matches!(
self,
Expand All @@ -349,6 +353,12 @@ impl MetaItem {
)
}

/// Attempts to merge `self` and `other` and returns the result if there is
/// one.
///
/// Counts are merged by adding them, percents are merged by adding the
/// numerator and denominator, and so on. Arbitrary strings and integers
/// can't be merged.
pub fn merge(&self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Count(a), Self::Count(b)) => Some(Self::Count(a + b)),
Expand Down
18 changes: 12 additions & 6 deletions crates/dbsp/src/storage/file/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ where
layer_file.write0((&T::key0(row0), &T::aux0(row0))).unwrap();
}

let reader = layer_file.into_reader().unwrap();
let (reader, column_info) = layer_file.into_reader().unwrap();
dbg!(column_info);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Four dbg!() calls left in this file (here + lines 633, 803, 860). Remove before merge — they'll pollute CI output on every test run.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's wrong, they'll only show up if a test fails.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point that cargo test captures stderr by default and only shows it on failure. But dbg!() is a temporary debugging macro — it executes on every test run (allocating, formatting, and printing), just silently. The right tool for persistent diagnostic output is tracing::debug!() or eprintln! with a descriptive comment. I'd still ask that these be removed before merge.

reader.evict();
let rows0 = reader.rows();
let expected0 = |row0| {
Expand Down Expand Up @@ -628,7 +629,8 @@ where
layer_file.write0((&T::key0(row0), &T::aux0(row0))).unwrap();
}

let reader = layer_file.into_reader().unwrap();
let (reader, column_info) = layer_file.into_reader().unwrap();
dbg!(column_info);
reader.evict();
test_multifetch_two_columns::<T>(&reader);
}
Expand Down Expand Up @@ -787,7 +789,7 @@ where
let reader = if reopen {
println!("closing writer and reopening as reader");
let path = writer.path().clone();
let (_file_handle, _bloom_filter) = writer.close().unwrap();
let (_file_handle, _bloom_filter, _column_info) = writer.close().unwrap();
Reader::open(
&[&factories.any_factories()],
test_buffer_cache,
Expand All @@ -797,7 +799,9 @@ where
.unwrap()
} else {
println!("transforming writer into reader");
writer.into_reader().unwrap()
let (reader, column_info) = writer.into_reader().unwrap();
dbg!(column_info);
reader
};
reader.evict();
assert_eq!(reader.rows().len(), n as u64);
Expand Down Expand Up @@ -842,7 +846,7 @@ fn test_one_column_zset<K, A>(
let reader = if reopen {
println!("closing writer and reopening as reader");
let path = writer.path().clone();
let (_file_handle, _bloom_filter) = writer.close().unwrap();
let (_file_handle, _bloom_filter, _column_info) = writer.close().unwrap();
Reader::open(
&[&factories.any_factories()],
test_buffer_cache,
Expand All @@ -852,7 +856,9 @@ fn test_one_column_zset<K, A>(
.unwrap()
} else {
println!("transforming writer into reader");
writer.into_reader().unwrap()
let (reader, column_info) = writer.into_reader().unwrap();
dbg!(column_info);
reader
};
reader.evict();
assert_eq!(reader.rows().len(), n as u64);
Expand Down
Loading
Loading