From b297bb026285a431be2de724f2763c657a9ec58e Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Sat, 4 Apr 2026 00:51:24 -0700 Subject: [PATCH 1/2] assertions Signed-off-by: Leonid Ryzhyk --- crates/dbsp/src/operator/dynamic/input.rs | 146 +++++++++++++++++- .../dbsp/src/operator/dynamic/input_upsert.rs | 21 ++- 2 files changed, 164 insertions(+), 3 deletions(-) diff --git a/crates/dbsp/src/operator/dynamic/input.rs b/crates/dbsp/src/operator/dynamic/input.rs index 18470f651af..675651edde9 100644 --- a/crates/dbsp/src/operator/dynamic/input.rs +++ b/crates/dbsp/src/operator/dynamic/input.rs @@ -1390,7 +1390,15 @@ mod test { zset, }; use anyhow::Result as AnyResult; - use std::{cmp::max, collections::VecDeque, iter::once, ops::Mul}; + use rand::seq::index; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaCha8Rng; + use std::{ + cmp::max, + collections::{BTreeSet, HashMap, VecDeque}, + iter::once, + ops::Mul, + }; fn input_batches() -> Vec> { vec![ @@ -2118,6 +2126,142 @@ mod test { dbsp.kill().unwrap(); } + /// Split `items` into `k` contiguous non-empty segments (`k` in 1..=min(3, n)). + fn partition_into_k_contiguous_batches( + items: Vec, + k: usize, + rng: &mut ChaCha8Rng, + ) -> Vec> { + let n = items.len(); + debug_assert!(k >= 1); + if n == 0 { + return vec![]; + } + let k = k.min(n).max(1); + if k == 1 { + return vec![items]; + } + let split_at: Vec = index::sample(rng, n - 1, k - 1) + .into_iter() + .map(|i| i + 1) + .collect(); + let mut split_at = split_at; + split_at.sort_unstable(); + let mut out = Vec::with_capacity(k); + let mut start = 0usize; + for cut in split_at { + out.push(items[start..cut].to_vec()); + start = cut; + } + out.push(items[start..].to_vec()); + out + } + + fn apply_map_update(state: &mut HashMap, key: u64, upd: Update) { + match upd { + Update::Insert(v) => { + state.insert(key, v); + } + Update::Delete => { + state.remove(&key); + } + Update::Update(v) => { + if state.contains_key(&key) { + state.insert(key, v); + } + } + } + } + + fn indexed_zset_state_diff( + before: &HashMap, + after: &HashMap, + ) -> OrdIndexedZSet { + let keys: BTreeSet = before.keys().chain(after.keys()).copied().collect(); + let mut tuples = Vec::new(); + for k in keys { + let old_v = before.get(&k).copied(); + let new_v = after.get(&k).copied(); + match (old_v, new_v) { + (None, None) => {} + (None, Some(nv)) => tuples.push(Tup2(Tup2(k, nv), 1)), + (Some(ov), None) => tuples.push(Tup2(Tup2(k, ov), -1)), + (Some(ov), Some(nv)) if ov != nv => { + tuples.push(Tup2(Tup2(k, ov), -1)); + tuples.push(Tup2(Tup2(k, nv), 1)); + } + _ => {} + } + } + OrdIndexedZSet::from_tuples((), tuples) + } + + /// Randomized stress test: many updates to three keys split across up to three `stage().flush()` + /// calls per transaction. The accumulated output must match applying updates in order (last + /// update per key wins within the folded semantics of [`InputUpsert`]). + #[test] + fn map_randomized_staged_batches_accumulate_output() { + let (mut dbsp, (input_handle, output_handle)) = Runtime::init_circuit(1, |circuit| { + let (stream, handle) = circuit.add_input_map::(|v, u| *v = *u); + Ok((handle, stream.accumulate_output())) + }) + .unwrap(); + + let mut state: HashMap = HashMap::new(); + let mut rng = ChaCha8Rng::seed_from_u64(0x_6D61_705F_7374_6167_u64); + + for step in 0..128000 { + println!("step {}", step); + let before = state.clone(); + let num_updates = rng.gen_range(0..=12); + + let updates: Vec>> = (0..num_updates) + .map(|_| { + let key = rng.gen_range(1u64..=3); + let upd = match rng.gen_range(0..3) { + 0 => Update::Insert(rng.gen_range(0u64..512)), + 1 => Update::Delete, + 2 => Update::Update(rng.gen_range(0u64..=512)), + _ => unreachable!(), + }; + Tup2(key, upd) + }) + .collect(); + + for Tup2(k, u) in &updates { + apply_map_update(&mut state, *k, u.clone()); + } + + let num_batches = if num_updates == 0 { + 1usize + } else { + rng.gen_range(1..=std::cmp::min(3, num_updates)) + }; + + let batches = if num_updates == 0 { + vec![Vec::new()] + } else { + partition_into_k_contiguous_batches(updates, num_batches, &mut rng) + }; + + for batch in batches { + println!("batch {:?}", batch); + input_handle.stage(once(VecDeque::from(batch))).flush(); + } + + dbsp.transaction().unwrap(); + + let expected = indexed_zset_state_diff(&before, &state); + assert_eq!( + output_handle.concat().consolidate(), + expected, + "accumulated output should equal the net map change for this transaction" + ); + } + + dbsp.kill().unwrap(); + } + fn map_with_waterline_test_circuit( circuit: &RootCircuit, ) -> ( diff --git a/crates/dbsp/src/operator/dynamic/input_upsert.rs b/crates/dbsp/src/operator/dynamic/input_upsert.rs index 8f98b652411..ecc3a576d51 100644 --- a/crates/dbsp/src/operator/dynamic/input_upsert.rs +++ b/crates/dbsp/src/operator/dynamic/input_upsert.rs @@ -549,10 +549,10 @@ where async fn eval( &mut self, trace: &T, - updates: &Vec>>>, + input_updates: &Vec>>>, ) -> B { // Inputs must be sorted by key - let mut updates = updates + let mut updates = input_updates .iter() .filter_map(|updates| { if !updates.is_empty() { @@ -618,6 +618,14 @@ where if !key_updates.is_empty() { for pair in key_updates.dyn_iter_mut() { let (v, d) = pair.split_mut(); + if **d > 1 { + println!( + "invalid update key: {:?}, value: {:?}, weight: {:?}", + cur_key, v, d + ); + println!("inputs: {:?}", input_updates); + panic!(); + } builder.push_val_diff_mut(v, d); } builder.push_key(cur_key); @@ -684,6 +692,15 @@ where if !key_updates.is_empty() { for pair in key_updates.dyn_iter_mut() { let (v, d) = pair.split_mut(); + if **d > 1 { + println!( + "invalid update key: {:?}, value: {:?}, weight: {:?}", + cur_key, v, d + ); + println!("inputs: {:?}", input_updates); + panic!(); + } + builder.push_val_diff_mut(v, d); } builder.push_key(cur_key); From 7e00e06fcd8ad8029d5b69d6afaf816a8c66c799 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Sat, 4 Apr 2026 12:12:24 -0700 Subject: [PATCH 2/2] print metadata --- crates/adapterlib/src/format.rs | 2 +- crates/adapters/src/controller.rs | 11 ++++-- crates/adapters/src/controller/stats.rs | 34 +++++++++++++++++-- crates/adapters/src/format/avro/test.rs | 18 +++++----- crates/adapters/src/format/json/output.rs | 2 +- crates/adapters/src/format/parquet/test.rs | 2 +- .../src/integrated/delta_table/output.rs | 25 +++++++++++--- .../src/integrated/postgres/output.rs | 4 +-- .../adapters/src/test/mock_output_consumer.rs | 2 +- 9 files changed, 76 insertions(+), 24 deletions(-) diff --git a/crates/adapterlib/src/format.rs b/crates/adapterlib/src/format.rs index 3ce6d14520c..0c9ddf20d72 100644 --- a/crates/adapterlib/src/format.rs +++ b/crates/adapterlib/src/format.rs @@ -682,7 +682,7 @@ pub trait OutputConsumer: Send { /// The encoder should not generate buffers exceeding this size. fn max_buffer_size_bytes(&self) -> usize; - fn batch_start(&mut self, step: Step); + fn batch_start(&mut self, step: Step, processed_records: usize); /// See OutputEndpoint::push_buffer. fn push_buffer(&mut self, buffer: &[u8], num_records: usize); diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 94a16110029..2c3422b2d3c 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -6253,9 +6253,10 @@ impl ControllerInner { endpoint_name: &str, encoder: &mut dyn Encoder, step: Step, + processed_records: usize, controller: &ControllerInner, ) { - encoder.consumer().batch_start(step); + encoder.consumer().batch_start(step, processed_records); encoder.encode(batch).unwrap_or_else(|e| { controller.encode_error(endpoint_id, endpoint_name, e, Some("encoder_error")) }); @@ -6300,6 +6301,9 @@ impl ControllerInner { &endpoint_name, encoder.as_mut(), output_buffer.buffered_step, + output_buffer + .buffered_processed_records + .total_processed_input_records as usize, &controller, ); @@ -6346,6 +6350,9 @@ impl ControllerInner { &endpoint_name, encoder.as_mut(), step, + processed_records + .map(|r| r.total_processed_input_records as usize) + .unwrap_or(0), &controller, ); } @@ -7127,7 +7134,7 @@ impl OutputConsumer for OutputProbe { self.endpoint.max_buffer_size_bytes() } - fn batch_start(&mut self, step: Step) { + fn batch_start(&mut self, step: Step, _processed_records: usize) { self.endpoint.batch_start(step).unwrap_or_else(|e| { self.controller.output_transport_error( self.endpoint_id, diff --git a/crates/adapters/src/controller/stats.rs b/crates/adapters/src/controller/stats.rs index c5adb94de30..a5b9c210036 100644 --- a/crates/adapters/src/controller/stats.rs +++ b/crates/adapters/src/controller/stats.rs @@ -484,6 +484,16 @@ impl ControllerStatus { self.global_metrics.state.load(Ordering::Relaxed) } + pub fn metadata_as_of(&self, offset: u64) -> BTreeMap> { + let mut result = BTreeMap::new(); + + for (endpoint_id, input_status) in self.inputs.read().iter() { + result.insert(endpoint_id.to_string(), input_status.metadata_as_of(offset)); + } + + result + } + /// Set the state to `desired`. /// /// Setting the state to [PipelineState::Terminated] is permanent; the state @@ -1712,6 +1722,10 @@ impl WatermarkTracker { pub fn completed_watermark(&self) -> Option { self.0.lock().unwrap().completed_watermark.clone() } + + pub fn metadata_as_of(&self, offset: u64) -> Option { + self.0.lock().unwrap().as_of(offset) + } } /// Bound the number of tracked watermarks to avoid unbounded memory growth. @@ -1860,10 +1874,22 @@ impl WatermarkTrackerInner { } } } + + fn as_of(&self, offset: u64) -> Option { + let mut watermark = None; + for entry in self.watermark_with_metadata_list.iter() { + if entry.global_offset <= offset { + watermark = Some(entry.clone()); + } else { + break; + } + } + watermark + } } -#[derive(Debug)] -struct WatermarkListEntry { +#[derive(Debug, Clone)] +pub struct WatermarkListEntry { watermark: Watermark, global_offset: u64, processed_at: Option>, @@ -1965,6 +1991,10 @@ impl InputEndpointStatus { completed_frontier: self.completed_frontier.completed_watermark(), } } + + pub fn metadata_as_of(&self, offset: u64) -> Option { + self.completed_frontier.metadata_as_of(offset) + } } impl InputEndpointStatus { diff --git a/crates/adapters/src/format/avro/test.rs b/crates/adapters/src/format/avro/test.rs index e9f78a9c2c7..137306d69ad 100644 --- a/crates/adapters/src/format/avro/test.rs +++ b/crates/adapters/src/format/avro/test.rs @@ -1259,7 +1259,7 @@ where }) .collect::>(); for (step, zset) in zsets.into_iter().enumerate() { - encoder.consumer().batch_start(step as u64); + encoder.consumer().batch_start(step as u64, 0); encoder.encode(zset.arc_as_batch_reader()).unwrap(); encoder.consumer().batch_end(); } @@ -1356,7 +1356,7 @@ fn test_raw_avro_output_indexed( }) .collect::>(); for (step, zset) in zsets.into_iter().enumerate() { - encoder.consumer().batch_start(step as u64); + encoder.consumer().batch_start(step as u64, 0); encoder.encode(zset.arc_as_batch_reader()).unwrap(); encoder.consumer().batch_end(); } @@ -1448,7 +1448,7 @@ fn test_confluent_avro_output( }) .collect::>(); for (step, zset) in zsets.into_iter().enumerate() { - encoder.consumer().batch_start(step as u64); + encoder.consumer().batch_start(step as u64, 0); encoder.encode(zset.arc_as_batch_reader()).unwrap(); encoder.consumer().batch_end(); } @@ -1557,7 +1557,7 @@ fn test_confluent_avro_output_indexed( .collect::>(); for (step, zset) in zsets.into_iter().enumerate() { - encoder.consumer().batch_start(step as u64); + encoder.consumer().batch_start(step as u64, 0); encoder.encode(zset.arc_as_batch_reader()).unwrap(); encoder.consumer().batch_end(); } @@ -1644,7 +1644,7 @@ fn test_non_unique_keys() { let zset = OrdIndexedZSet::from_tuples((), vec![Tup2(Tup2(k1.clone(), v1.clone()), 2)]); let zset = Arc::new(>::new(zset)) as Arc; - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err(); assert!(err.to_string().contains(r#"is inserted 2 times"#)); encoder.consumer().batch_end(); @@ -1652,7 +1652,7 @@ fn test_non_unique_keys() { let zset = OrdIndexedZSet::from_tuples((), vec![Tup2(Tup2(k1.clone(), v1.clone()), -2)]); let zset = Arc::new(>::new(zset)) as Arc; - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err(); assert!(err.to_string().contains(r#"is deleted 2 times"#)); encoder.consumer().batch_end(); @@ -1666,7 +1666,7 @@ fn test_non_unique_keys() { ); let zset = Arc::new(>::new(zset)) as Arc; - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err(); println!("{err}"); assert!( @@ -1684,7 +1684,7 @@ fn test_non_unique_keys() { ); let zset = Arc::new(>::new(zset)) as Arc; - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err(); println!("{err}"); assert!( @@ -1788,7 +1788,7 @@ fn run_avro_encoder_spine_snapshot_indexed( ) .unwrap(); - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); encoder.encode(snapshot).unwrap(); encoder.consumer().batch_end(); diff --git a/crates/adapters/src/format/json/output.rs b/crates/adapters/src/format/json/output.rs index 3f5013326b0..2fe0a1aeb85 100644 --- a/crates/adapters/src/format/json/output.rs +++ b/crates/adapters/src/format/json/output.rs @@ -674,7 +674,7 @@ mod test { }) .collect::>(); for (step, zset) in zsets.into_iter().enumerate() { - encoder.consumer().batch_start(step as u64); + encoder.consumer().batch_start(step as u64, 0); encoder.encode(zset.arc_as_batch_reader()).unwrap(); encoder.consumer().batch_end(); } diff --git a/crates/adapters/src/format/parquet/test.rs b/crates/adapters/src/format/parquet/test.rs index 127b6ce0047..62e6b9b67fd 100644 --- a/crates/adapters/src/format/parquet/test.rs +++ b/crates/adapters/src/format/parquet/test.rs @@ -156,7 +156,7 @@ fn parquet_output() { ); let zset = Arc::new(SerBatchImpl::<_, TestStruct2, ()>::new(zset)) as Arc; - encoder.consumer().batch_start(0); + encoder.consumer().batch_start(0, 0); encoder.encode(zset).unwrap(); encoder.consumer().batch_end(); diff --git a/crates/adapters/src/integrated/delta_table/output.rs b/crates/adapters/src/integrated/delta_table/output.rs index fc364063c7d..4fc1a041da5 100644 --- a/crates/adapters/src/integrated/delta_table/output.rs +++ b/crates/adapters/src/integrated/delta_table/output.rs @@ -35,6 +35,8 @@ use serde::Serialize; use serde_arrow::ArrayBuilder; use serde_arrow::schema::SerdeArrowSchema; use std::cmp::min; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use tokio::time::{Duration, sleep}; use tracing::{Instrument, info, info_span, warn}; @@ -62,6 +64,7 @@ struct DeltaTableWriterInner { key_schema: Option, value_schema: Relation, controller: Weak, + processed_records: AtomicUsize, } pub struct DeltaTableWriter { @@ -141,6 +144,7 @@ impl DeltaTableWriter { key_schema: key_schema.clone(), value_schema: value_schema.clone(), controller, + processed_records: AtomicUsize::new(0), }); // Create or open the delta table. // Panic safety: block_on() panics if called from a tokio async context. @@ -525,7 +529,15 @@ async fn stream_encode_and_write( while cursor.key_valid() { let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) - .map_err(WriteError::Deterministic)?; + .map_err(|e| { + if let Some(controller) = inner.controller.upgrade() { + let metadata = controller + .status + .metadata_as_of(inner.processed_records.load(Ordering::Relaxed) as u64); + println!("metadata: {:?}", metadata); + }; + WriteError::Deterministic(e) + })?; if let Some(op) = op { cursor.rewind_vals(); @@ -634,9 +646,12 @@ impl OutputConsumer for DeltaTableWriter { usize::MAX } - fn batch_start(&mut self, _step: Step) { + fn batch_start(&mut self, _step: Step, processed_records: usize) { self.pending_actions.clear(); self.num_rows = 0; + self.inner + .processed_records + .store(processed_records, Ordering::Relaxed); } fn push_buffer(&mut self, _buffer: &[u8], _num_records: usize) { @@ -1038,7 +1053,7 @@ mod parallel { } fn encode_batch(endpoint: &mut DeltaTableWriter, batch: &Arc) { - endpoint.consumer().batch_start(0); + endpoint.consumer().batch_start(0, 0); endpoint .encode(batch.clone().arc_as_batch_reader()) .unwrap(); @@ -1376,7 +1391,7 @@ mod parallel { // Make directory read-only to trigger write failure. std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); - endpoint.consumer().batch_start(0); + endpoint.consumer().batch_start(0, 0); let result = endpoint.encode(batch.arc_as_batch_reader()); // Restore permissions before asserting (so TempDir cleanup succeeds). @@ -1415,7 +1430,7 @@ mod parallel { // Make directory read-only to trigger write failure. std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); - endpoint.consumer().batch_start(0); + endpoint.consumer().batch_start(0, 0); let result = endpoint.encode(batch.arc_as_batch_reader()); // Restore permissions. diff --git a/crates/adapters/src/integrated/postgres/output.rs b/crates/adapters/src/integrated/postgres/output.rs index 6af829324a0..f217789a7b7 100644 --- a/crates/adapters/src/integrated/postgres/output.rs +++ b/crates/adapters/src/integrated/postgres/output.rs @@ -692,7 +692,7 @@ impl OutputConsumer for PostgresOutputEndpoint { self.config.max_buffer_size_bytes } - fn batch_start(&mut self, _step: Step) { + fn batch_start(&mut self, _step: Step, _processed_records: usize) { self.txn_start = std::time::Instant::now(); match self.broadcast_and_collect(BroadcastCommand::BatchStart) { @@ -1272,7 +1272,7 @@ mod tests { } fn encode_batch(endpoint: &mut PostgresOutputEndpoint, batch: &Arc) { - endpoint.consumer().batch_start(0); + endpoint.consumer().batch_start(0, 0); endpoint .encode(batch.clone().arc_as_batch_reader()) .unwrap(); diff --git a/crates/adapters/src/test/mock_output_consumer.rs b/crates/adapters/src/test/mock_output_consumer.rs index 6fb0d658d57..06b08ddef78 100644 --- a/crates/adapters/src/test/mock_output_consumer.rs +++ b/crates/adapters/src/test/mock_output_consumer.rs @@ -67,7 +67,7 @@ impl OutputConsumer for MockOutputConsumer { self.max_buffer_size_bytes } - fn batch_start(&mut self, _step: Step) {} + fn batch_start(&mut self, _step: Step, _processed_records: usize) {} fn push_buffer(&mut self, buffer: &[u8], _num_records: usize) { self.data .lock()