diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 5143cf25f68..2c97597b64a 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -2491,6 +2491,10 @@ struct CircuitThread { input_metadata: HashMap>, commit_updates: Option, + + /// Set to true on startup if the circuit requires bootstrapping. + /// Cleared when the circuit completes bootstrapping. + bootstrapping: bool, } struct CommitUpdates { @@ -2759,9 +2763,9 @@ impl CircuitThread { incarnation_uuid, )?; - controller - .status - .set_bootstrap_in_progress(circuit.bootstrap_in_progress()); + let bootstrapping = circuit.bootstrap_in_progress(); + + controller.status.set_bootstrap_in_progress(bootstrapping); let input_metadata = input_metadata.map(|input_metadata| { input_metadata @@ -2777,7 +2781,7 @@ impl CircuitThread { // The pipeline hasn't changed based on input and output persistent id values, // yet the circuit is bootstrapping. This is a bug. - if can_replay && circuit.bootstrap_in_progress() { + if can_replay && bootstrapping { return Err(ControllerError::UnexpectedBootstrap { bootstrap_info: circuit.bootstrap_info().clone(), }); @@ -2798,10 +2802,10 @@ impl CircuitThread { }?; // The above code ensures that replay and bootstrapping cannot happen at the same time. - assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress())); + assert!(!(ft.is_replaying() && bootstrapping)); // Disable journaling while we're bootstrapping the circuit. - if circuit.bootstrap_in_progress() { + if bootstrapping { ft.disable(); } @@ -2829,6 +2833,7 @@ impl CircuitThread { checkpoint_sender, input_metadata: input_metadata.unwrap_or_default(), commit_updates: None, + bootstrapping, }) } @@ -2857,10 +2862,7 @@ impl CircuitThread { // so that if the first step() we perform below before entering the loop // ends up finishing bootstrapping, we will still perform an extra step to initialize // the output table snapshots inside the loop. - let mut trigger = StepTrigger::new( - self.controller.clone(), - self.circuit.bootstrap_in_progress(), - ); + let mut trigger = StepTrigger::new(self.controller.clone()); if config.global.cpu_profiler { self.circuit.enable_cpu_profiler().unwrap_or_else(|e| { error!("Failed to enable CPU profiler: {e}"); @@ -2875,7 +2877,7 @@ impl CircuitThread { // // Skip this during bootstrap to avoid a slow first step. We don't guarantee // that view snapshots are up-to-date until bootstrap is complete. - if !self.circuit.bootstrap_in_progress() + if !self.bootstrapping && let Err(error) = self.step() { let _ = init_status_sender.send(Err(error)); @@ -2949,7 +2951,11 @@ impl CircuitThread { self.last_checkpoint(), self.last_checkpoint_sync(), self.replaying(), - self.circuit.bootstrap_in_progress(), + // `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete, + // which is required to initialize the output snapshots. + // We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress` + // rather than `self.bootstrapping` here. + self.controller.status.bootstrap_in_progress(), self.checkpoint_requested(), self.sync_checkpoint_requested(), coordination_request, @@ -3022,11 +3028,6 @@ impl CircuitThread { transaction_state.into_coordination_status(), )); - // If bootstrapping has completed, update the status flag. - self.controller - .status - .set_bootstrap_in_progress(self.circuit.bootstrap_in_progress()); - // Update `trace_snapshot` to the latest traces. // // We do this before updating `total_processed_records` so that ad hoc @@ -3034,10 +3035,30 @@ impl CircuitThread { // processing; otherwise, there is a race for any code that runs a query // as soon as input has been processed. if transaction_state == TransactionState::None { - Span::new("update") - .with_category("Step") - .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) - .in_scope(|| self.update_snapshot()); + // Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction). + // This guarantees that: + // 1. Ad hoc queries observe a consistent view of the data. + // 2. Ad hoc snapshots are up-to-date before the pipeline is marked as running. + if !self.controller.status.bootstrap_in_progress() { + Span::new("update") + .with_category("Step") + .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) + .in_scope(|| self.update_snapshot()); + } + + let bootstrapping = self.circuit.bootstrap_in_progress(); + + // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag + // until the circuit performs an extra transaction to initialize output snapshots + // (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()` + // is true). + if self.bootstrapping && !bootstrapping { + self.bootstrapping = false; + } else { + self.controller + .status + .set_bootstrap_in_progress(bootstrapping); + } } // Record that we've processed the records, unless there is a transaction in progress, @@ -4243,10 +4264,6 @@ struct StepTrigger { /// Time between automatic checkpoint syncs. sync_interval: Option, - - /// The circuit is bootstrapping. Used to detect the transition from bootstrapping - /// to normal mode. - bootstrapping: bool, } /// Action for the controller to take. @@ -4267,7 +4284,7 @@ enum Action { impl StepTrigger { /// Returns a new [StepTrigger]. - fn new(controller: Arc, bootstrapping: bool) -> Self { + fn new(controller: Arc) -> Self { let config = &controller.status.pipeline_config.global; let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs); let min_batch_size_records = config.min_batch_size_records; @@ -4287,7 +4304,6 @@ impl StepTrigger { max_buffering_delay, min_batch_size_records, checkpoint_interval, - bootstrapping, sync_interval, } } @@ -4349,16 +4365,7 @@ impl StepTrigger { } _ => Some(Action::Park(None)), } - } else if replaying - || self.controller.transaction_commit_requested() - || bootstrapping - || self.bootstrapping - { - // The `self.bootstrapping` condition above detects a transition - // from bootstrapping to normal operation and makes sure that the - // circuit performs an extra step in the normal mode in order to - // initialize output table snapshots of output relations that did - // not participate in bootstrapping. + } else if replaying || self.controller.transaction_commit_requested() || bootstrapping { Some(Action::Step) } else if timer_expired(next_checkpoint, now) && !checkpoint_requested { Some(Action::Checkpoint) @@ -4425,7 +4432,6 @@ impl StepTrigger { } }; - self.bootstrapping = bootstrapping; if result == Action::Step { self.buffer_timeout = None; } diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index 619bb2d0a94..97a67cbd18e 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream( // We currently only support using operators in the top-level circuit // as replay sources. if TypeId::of::<()>() == TypeId::of::() { - circuit.cache_insert( - ReplaySource::new(stream.stream_id()), - Box::new(replay_stream.clone()), - ); + // If a replay source already exists, don't overwrite it. This normally shouldn't + // happen as we should not have more than one integral for each stream. One situation + // where this does happen today is for input streams that have an integral without + // an accumulator as part of input_upsert, and another integral with an accumulator + // created by a downstream join or aggregate. In this case, we want to use the former + // for replay, as the latter may have been added in the new version of the program + // and may be empty, while the former can have state (conversely, if the input integral + // is empty, the downstream integral is guaranteed to be empty too). + if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) { + circuit.cache_insert( + ReplaySource::new(stream.stream_id()), + Box::new(replay_stream.clone()), + ); + } } } @@ -7597,10 +7607,15 @@ impl CircuitHandle { return true; }; - replay_info.replay_sources.keys().all(|node_id| { + // Bootstrapping is finished when all replay sources have completed their replay and the + // transaction has been committed. + + let all_complete = replay_info.replay_sources.keys().all(|node_id| { self.circuit .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete()) - }) + }); + + all_complete && self.is_commit_complete() } /// Finalize the replay phase of the circuit. diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 42a277fb32e..2ae439dfd82 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -1397,20 +1397,6 @@ impl DBSPHandle { Ok(progress) } - pub fn set_replay_step_size(&mut self, step_size: usize) { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().set_replay_step_size(step_size); - } - } - - pub fn get_replay_step_size(&self) -> usize { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().get_replay_step_size() - } else { - 0 - } - } - /// The circuit has been resumed from a checkpoint and is currently bootstrapping the modified part of the circuit. pub fn bootstrap_in_progress(&self) -> bool { self.bootstrap_info.is_some() diff --git a/crates/dbsp/src/circuit/replay_tests.rs b/crates/dbsp/src/circuit/replay_tests.rs index 37cd54f995b..a230329c476 100644 --- a/crates/dbsp/src/circuit/replay_tests.rs +++ b/crates/dbsp/src/circuit/replay_tests.rs @@ -1,9 +1,10 @@ use feldera_types::config::StorageConfig; use crate::{ - CmpFunc, DBData, OrdZSet, OutputHandle, RootCircuit, Runtime, Stream, ZSetHandle, ZWeight, + CmpFunc, DBData, IndexedZSetHandle, OrdIndexedZSet, OrdZSet, OutputHandle, RootCircuit, + Runtime, Stream, ZSetHandle, ZWeight, circuit::dbsp_handle::CircuitStorageConfig, - default_hash, + default_hash, indexed_zset, operator::{ Max, Min, time_series::{RelOffset, RelRange}, @@ -148,52 +149,30 @@ type CircuitFn = Arc< ///``` /// /// The common part of the two circuits must return identical results. -fn test_replay( - circuit_constructor1: CircuitFn, - circuit_constructor2: CircuitFn, - inputs1: Vec, - inputs2_1: Vec, - inputs2_2: Vec, - inputs3: Vec, -) where - I1: TestDataType, - I2: TestDataType, - I3: TestDataType, - O1: TestDataType, - O2: TestDataType, - O3: TestDataType, -{ - // Run with replay step size < splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1.clone(), - circuit_constructor2.clone(), - inputs1.clone(), - inputs2_1.clone(), - inputs2_2.clone(), - inputs3.clone(), - Some(1), - ); - // Run without replay step size > splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1, - circuit_constructor2, - inputs1, - inputs2_1, - inputs2_2, - inputs3, - None, - ); +fn circuit_config(path: &PathBuf) -> CircuitConfig { + CircuitConfig::with_workers(NUM_WORKERS) + .with_splitter_chunk_size_records(2) + .with_mode(Mode::Persistent) + .with_storage(Some( + CircuitStorageConfig::for_config( + StorageConfig { + path: path.to_string_lossy().into_owned(), + cache: Default::default(), + }, + Default::default(), + ) + .unwrap(), + )) } -fn test_replay_with_step_size( +fn test_replay( circuit_constructor1: CircuitFn, circuit_constructor2: CircuitFn, inputs1: Vec, inputs2_1: Vec, inputs2_2: Vec, inputs3: Vec, - replay_step_size: Option, ) where I1: TestDataType, I2: TestDataType, @@ -210,22 +189,6 @@ fn test_replay_with_step_size( let path = tempfile::tempdir().unwrap().keep(); println!("Running replay_test in {}", path.display()); - fn circuit_config(path: &PathBuf) -> CircuitConfig { - CircuitConfig::with_workers(NUM_WORKERS) - .with_splitter_chunk_size_records(2) - .with_mode(Mode::Persistent) - .with_storage(Some( - CircuitStorageConfig::for_config( - StorageConfig { - path: path.to_string_lossy().into_owned(), - cache: Default::default(), - }, - Default::default(), - ) - .unwrap(), - )) - } - // Create both reference circuits, feed I1 and I2 to circuit1; feed I2 and I3 to circuit2. let mut reference_output1 = Vec::new(); let mut reference_output2 = Vec::new(); @@ -343,10 +306,6 @@ fn test_replay_with_step_size( }) .unwrap(); - if let Some(replay_step_size) = replay_step_size { - circuit.set_replay_step_size(replay_step_size); - } - while circuit.bootstrap_in_progress() { circuit.transaction().unwrap(); } @@ -1690,3 +1649,252 @@ fn test_rolling_circuit() { std::iter::repeat_n((), 20).collect(), ); } + +// Regression test: +// +// Pipeline 1: +// ---> input_map +// +// Pipeline 2: +// ---> input_map ---> aggregate --> output +// +// The second pipeline should replay the input from the input_map operator. +// A bug prevented this from happening, because the integral built by the +// aggregate operator was used to replay instead. + +#[test] +fn regression1() { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + + let (mut circuit1, input_handle1) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + Ok(input_handle) + }) + .unwrap(); + + input_handle1.push(0, crate::operator::Update::Insert(0)); + + circuit1.transaction().unwrap(); + + // Checkpoint. + let checkpoint = circuit1.checkpoint().run().unwrap(); + circuit1.kill().unwrap(); + + // Restart the second circuit from the checkpoint. + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit2, (_input_handle2, output_handle2)) = + Runtime::init_circuit(circuit_config, move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + + let aggregate = input_stream + .aggregate_persistent(Some("aggregate1"), Max) + .set_persistent_id(Some("aggregate1")); + + let output_handle = aggregate + .accumulate_trace() + .apply(|trace| trace.ro_snapshot().consolidate()) + .output_persistent(Some("output")); + Ok((input_handle, output_handle)) + }) + .unwrap(); + + while circuit2.bootstrap_in_progress() { + circuit2.transaction().unwrap(); + } + println!("Replay finished"); + + let actual_output = &output_handle2.concat().consolidate(); + + // The bug causes the output to be empty. + assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1})); +} + +/// Unit test for the replay behavior of Z1Trace and AccumulateZ1Trace operators. +/// Operators must correctly replay their contents during bootstrap as one atomic transaction. + +#[derive(Clone, Copy, Debug)] +enum ReplayTraceKind { + IntegrateTrace, + AccumulateTrace, +} + +type IndexedReplayBatch = Vec>>; + +fn add_replay_trace( + stream: &Stream>, + trace_kind: ReplayTraceKind, +) { + match trace_kind { + ReplayTraceKind::IntegrateTrace => { + stream.integrate_trace(); + } + ReplayTraceKind::AccumulateTrace => { + stream.accumulate_trace(); + } + } +} + +fn transactional_bootstrap_circuit1( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> IndexedZSetHandle { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + input_handle +} + +fn transactional_bootstrap_circuit2( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> ( + IndexedZSetHandle, + OutputHandle>>, +) { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + + let output_handle = input_stream.accumulate_output_persistent(Some("output")); + + (input_handle, output_handle) +} + +fn replay_batch_to_indexed_zset(batches: &[IndexedReplayBatch]) -> OrdIndexedZSet { + OrdIndexedZSet::from_tuples( + (), + batches + .iter() + .flatten() + .map(|Tup2(key, Tup2(value, weight))| Tup2(Tup2(*key, *value), *weight)) + .collect(), + ) +} + +fn run_transactional_bootstrap_test( + trace_kind: ReplayTraceKind, + batches: Vec, + expect_multistep_replay: bool, +) { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + let expected = replay_batch_to_indexed_zset(&batches); + + let checkpoint = { + let (mut circuit, input_handle) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + Ok(transactional_bootstrap_circuit1(circuit, trace_kind)) + }) + .unwrap(); + + for mut batch in batches.clone() { + input_handle.append(&mut batch); + circuit.transaction().unwrap(); + } + + let checkpoint = circuit.checkpoint().run().unwrap(); + circuit.kill().unwrap(); + checkpoint + }; + + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit, (_input_handle, output_handle)) = + Runtime::init_circuit(circuit_config, move |circuit| { + Ok(transactional_bootstrap_circuit2(circuit, trace_kind)) + }) + .unwrap(); + + assert_eq!(output_handle.num_nonempty_mailboxes(), 0); + + if circuit.bootstrap_in_progress() { + circuit.start_transaction().unwrap(); + circuit.start_commit_transaction().unwrap(); + + let mut incomplete_commit_steps = 0; + loop { + let commit_complete = circuit.step().unwrap(); + if commit_complete { + break; + } + + incomplete_commit_steps += 1; + } + + if expect_multistep_replay { + assert!( + incomplete_commit_steps > 0, + "{trace_kind:?} replay finished in a single commit step despite the splitter chunk size" + ); + } + } + + assert!(!circuit.bootstrap_in_progress()); + assert_eq!(output_handle.concat().consolidate(), expected); + + circuit.kill().unwrap(); +} + +fn transactional_bootstrap_cases() -> Vec<(Vec, bool)> { + vec![ + (vec![], false), + (vec![vec![Tup2(1, Tup2(10, 1))]], false), + ( + vec![vec![Tup2(1, Tup2(10, 1)), Tup2(1, Tup2(11, 1))]], + false, + ), + ( + vec![ + vec![ + Tup2(1, Tup2(10, 1)), + Tup2(1, Tup2(11, 1)), + Tup2(2, Tup2(20, 1)), + ], + vec![ + Tup2(1, Tup2(11, -1)), + Tup2(1, Tup2(12, 1)), + Tup2(4, Tup2(40, 2)), + Tup2(5, Tup2(50, 2)), + Tup2(6, Tup2(50, 2)), + Tup2(7, Tup2(50, 2)), + Tup2(8, Tup2(50, 2)), + Tup2(9, Tup2(50, 2)), + ], + ], + true, + ), + ] +} + +#[test] +fn test_integrate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::IntegrateTrace, + batches, + expect_multistep_replay, + ); + } +} + +#[test] +fn test_accumulate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::AccumulateTrace, + batches, + expect_multistep_replay, + ); + } +} diff --git a/crates/dbsp/src/circuit/runtime.rs b/crates/dbsp/src/circuit/runtime.rs index 33b760126af..37cfdba62b0 100644 --- a/crates/dbsp/src/circuit/runtime.rs +++ b/crates/dbsp/src/circuit/runtime.rs @@ -64,9 +64,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use typedmap::TypedDashMap; -/// The number of tuples a stateful operator outputs per step during replay. -pub const DEFAULT_REPLAY_STEP_SIZE: usize = 10000; - #[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub enum Error { /// Specified persistent id is not found in the circuit. @@ -304,7 +301,6 @@ struct RuntimeInner { /// Panic info collected from failed worker threads. panic_info: Vec>>>, panicked: AtomicBool, - replay_step_size: AtomicUsize, /// Tokio runtime that runs async merger tasks (see `AsyncMerger`). tokio_merger_runtime: Mutex>, @@ -511,7 +507,6 @@ impl RuntimeInner { .map(|_| EnumMap::from_fn(|_| RwLock::new(None))) .collect(), panicked: AtomicBool::new(false), - replay_step_size: AtomicUsize::new(DEFAULT_REPLAY_STEP_SIZE), tokio_merger_runtime: Mutex::new(None), exchange_listener: Mutex::new(config.exchange_listener), }) @@ -1063,29 +1058,6 @@ impl Runtime { self.inner().step_size } - /// Configure the number of tuples a stateful operator outputs per step during replay. - /// - /// The default is `DEFAULT_REPLAY_STEP_SIZE`. - pub fn set_replay_step_size(&self, step_size: usize) { - self.inner() - .replay_step_size - .store(step_size, Ordering::Release); - } - - /// Get currently configured replay step size. - /// - /// Returns `DEFAULT_REPLAY_STEP_SIZE` if the current thread doesn't have a runtime. - pub fn replay_step_size() -> usize { - RUNTIME - .with(|rt| Some(rt.borrow().as_ref()?.get_replay_step_size())) - .unwrap_or(DEFAULT_REPLAY_STEP_SIZE) - } - - /// Get currently configured replay step size. - pub fn get_replay_step_size(&self) -> usize { - self.inner().replay_step_size.load(Ordering::Acquire) - } - /// Returns the worker index as a string. /// /// This is useful for metric labels. diff --git a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs index d631e57752f..75581b41221 100644 --- a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs +++ b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs @@ -147,9 +147,6 @@ impl Notifications { } enum TransactionPhase { - /// Not started - Idle, - /// Started, but not yet committing. Started, @@ -360,7 +357,7 @@ impl Inner { notifications: Notifications::new(num_async_nodes), handles: JoinSet::new(), waiting: false, - transaction_phase: TransactionPhase::Idle, + transaction_phase: TransactionPhase::CommitComplete, global_commit_consensus: Broadcast::new(), metadata_broadcast: Broadcast::new(), before_first_step: true, diff --git a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs index 829be69c111..0fef3ebbf65 100644 --- a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs +++ b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::dynamic::trace::{DelayedTraceId, TraceBounds}; use crate::operator::{TraceBound, require_persistent_id}; @@ -1117,7 +1117,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1161,6 +1161,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1169,11 +1170,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result diff --git a/crates/dbsp/src/operator/dynamic/trace.rs b/crates/dbsp/src/operator/dynamic/trace.rs index 2d90686a3f2..85270e14630 100644 --- a/crates/dbsp/src/operator/dynamic/trace.rs +++ b/crates/dbsp/src/operator/dynamic/trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::require_persistent_id; use crate::trace::spine_async::WithSnapshot; @@ -1184,7 +1184,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1228,6 +1228,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1236,11 +1237,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result diff --git a/python/feldera/testutils.py b/python/feldera/testutils.py index 1f4fd45150e..ba8d4b5c772 100644 --- a/python/feldera/testutils.py +++ b/python/feldera/testutils.py @@ -264,6 +264,7 @@ def build_pipeline( tables: dict, views: List[ViewSpec], resources: Optional[Resources] = None, + dev_tweaks: Optional[dict] = None, ) -> Pipeline: sql = generate_program(tables, views) @@ -277,6 +278,7 @@ def build_pipeline( resources=resources, workers=FELDERA_TEST_NUM_WORKERS, hosts=FELDERA_TEST_NUM_HOSTS, + dev_tweaks=dev_tweaks, ), ).create_or_replace() @@ -319,6 +321,25 @@ def transaction(pipeline: Pipeline, duration_seconds: int): log(f"Transaction committed in {time.monotonic() - commit_start} seconds") +def transaction_num_records(pipeline: Pipeline, num_records: int): + """Run a transaction until it ingests a record count or reaches end of input.""" + + log(f"Running transaction for {num_records} records or end of input") + initial_records = number_of_processed_records(pipeline) + pipeline.start_transaction() + + while not check_end_of_input(pipeline): + processed_records = number_of_processed_records(pipeline) - initial_records + if processed_records >= num_records: + break + time.sleep(3) + + log("Committing transaction") + commit_start = time.monotonic() + pipeline.commit_transaction() + log(f"Transaction committed in {time.monotonic() - commit_start} seconds") + + def checkpoint_pipeline(pipeline: Pipeline): """Create a checkpoint and wait for it to complete.""" @@ -362,6 +383,12 @@ def number_of_processed_records(pipeline: Pipeline) -> int: return pipeline.stats().global_metrics.total_processed_records +def number_of_input_records(pipeline: Pipeline) -> int: + """Get the total_input_records metric.""" + + return pipeline.stats().global_metrics.total_input_records + + def run_workload( pipeline_name: str, tables: dict, diff --git a/python/tests/runtime/test_output_snapshot.py b/python/tests/runtime/test_output_snapshot.py index 4a4d9215089..826f627041b 100644 --- a/python/tests/runtime/test_output_snapshot.py +++ b/python/tests/runtime/test_output_snapshot.py @@ -120,11 +120,12 @@ def _delta_connector( location: DeltaTestLocation, *, send_snapshot: bool, + name: str = "delta_out", index: str | None = None, ) -> dict: """Build a `delta_table_output` connector config.""" connector: dict = { - "name": "delta_out", + "name": name, "send_snapshot": send_snapshot, "transport": { "name": "delta_table_output", @@ -185,6 +186,58 @@ def _build_sql(locations: list[DeltaTestLocation], send_snapshot: bool) -> str: return "\n\n".join(parts) +def _build_sql_round3( + locations: list[DeltaTestLocation], + delta1_location: DeltaTestLocation, + delta2_location: DeltaTestLocation, + delta3_location: DeltaTestLocation, +) -> str: + """Render SQL for round 3. + + The original connectors remain unchanged with ``send_snapshot=true``. + The new view forces bootstrapping. ``delta2`` and ``delta3`` are added + to an existing view with snapshot enabled and disabled, respectively. + """ + parts = [ + "CREATE TABLE t1(id INT NOT NULL, n BIGINT NOT NULL, s VARCHAR NOT NULL)\n" + " WITH ('materialized' = 'true');", + ] + for (label, view, index, indexes_sql), loc in zip(_VIEWS, locations): + connectors = [ + _delta_connector(loc, send_snapshot=True, index=index), + ] + if label == "v_plain": + connectors.extend( + [ + _delta_connector( + delta2_location, + send_snapshot=True, + name="delta2", + ), + _delta_connector( + delta3_location, + send_snapshot=False, + name="delta3", + ), + ] + ) + parts.append( + f"CREATE MATERIALIZED VIEW {view} WITH (\n" + f" 'connectors' = '{json.dumps(connectors)}'\n" + f") AS SELECT * FROM t1;" + ) + if indexes_sql: + parts.append(indexes_sql) + + delta1 = _delta_connector(delta1_location, send_snapshot=False, name="delta1") + parts.append( + "CREATE MATERIALIZED VIEW v_added WITH (\n" + f" 'connectors' = '{json.dumps([delta1])}'\n" + ") AS SELECT * FROM t1;" + ) + return "\n\n".join(parts) + + @skip_on_arm64 # https://github.com/delta-io/delta-rs/issues/4413 def test_delta_output_send_snapshot_after_flag_flip(): """Verify snapshot delivery to delta sinks across a connector @@ -215,6 +268,13 @@ def test_delta_output_send_snapshot_after_flag_flip(): proves the initial snapshot delivered the full materialized-view contents — for every combination of indexes and which index the connector reads through. + + Round 3 adds a new view with a new Delta connector (``delta1``) and + adds two Delta connectors to an existing view: ``delta2`` with + ``send_snapshot=true`` and ``delta3`` with ``send_snapshot=false``. + Adding the view forces actual bootstrapping. ``delta1`` and + ``delta2`` should be populated during startup, while ``delta3`` + should remain empty until future deltas arrive. """ pipeline_name = unique_pipeline_name( "test_delta_output_send_snapshot_after_flag_flip" @@ -222,6 +282,9 @@ def test_delta_output_send_snapshot_after_flag_flip(): locations = [ DeltaTestLocation.create(f"{pipeline_name}_{label}") for label, *_ in _VIEWS ] + delta1_location = DeltaTestLocation.create(f"{pipeline_name}_delta1") + delta2_location = DeltaTestLocation.create(f"{pipeline_name}_delta2") + delta3_location = DeltaTestLocation.create(f"{pipeline_name}_delta3") expected = sorted_rows(_ROWS) # Round 1: send_snapshot=false. Push three rows; they land via the @@ -272,3 +335,40 @@ def test_delta_output_send_snapshot_after_flag_flip(): for (_, *_), loc in zip(_VIEWS, locations): assert sorted_rows(loc.read_rows()) == expected + + pipeline.checkpoint(wait=True) + pipeline.stop(force=True) + + # Round 3: add a new view and new output connectors to an existing view. + # The new view forces bootstrapping. The connector on the new view and + # the existing-view connector with send_snapshot=true are populated during + # startup; the existing-view connector with send_snapshot=false is not. + TEST_CLIENT.patch_pipeline( + name=pipeline_name, + sql=_build_sql_round3( + locations, + delta1_location, + delta2_location, + delta3_location, + ), + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + storage=Storage(config={"start_from_checkpoint": "latest"}), + ).to_dict(), + ) + pipeline = Pipeline.get(pipeline_name, TEST_CLIENT) + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) + + wait_for_condition( + "round 3: delta1 on new bootstrapped view receives rows", + lambda: sorted_rows(delta1_location.read_rows()) == expected, + timeout_s=60.0, + poll_interval_s=1.0, + ) + wait_for_condition( + "round 3: delta2 on existing view receives snapshot rows", + lambda: sorted_rows(delta2_location.read_rows()) == expected, + timeout_s=60.0, + poll_interval_s=1.0, + ) + assert delta3_location.read_rows() == [] diff --git a/python/tests/workloads/test_tpch.py b/python/tests/workloads/test_tpch.py index d68d3dccc70..2d4bc5ec469 100644 --- a/python/tests/workloads/test_tpch.py +++ b/python/tests/workloads/test_tpch.py @@ -1,20 +1,24 @@ import sys import time import unittest +from feldera.enums import BootstrapPolicy from feldera.pipeline import Pipeline from feldera.testutils import ( IndexSpec, ViewSpec, build_pipeline, check_for_endpoint_errors, + check_end_of_input, checkpoint_pipeline, generate_program, log, number_of_processed_records, + number_of_input_records, run_workload, - transaction, + transaction_num_records, unique_pipeline_name, validate_outputs, + wait_end_of_input, ) import tempfile import os @@ -32,6 +36,8 @@ def __init__( s3_path: Optional[str] = None, s3_region: Optional[str] = None, input_dir: Optional[str] = None, + segment_size: Optional[int] = None, + num_segments: Optional[int] = None, ): self.mode = mode @@ -39,6 +45,12 @@ def __init__( raise ValueError(f"Unknown mode: {mode}") self.input_mode = input_mode + self.segment_size = segment_size + self.num_segments = num_segments + + if self.num_segments is not None: + if self.num_segments <= 0 or self.num_segments > 20: + raise ValueError("num_segments must be between 1 and 20") if self.input_mode == "file": self.input_dir = input_dir @@ -117,6 +129,22 @@ def run_cli(): help="S3 bucket region. Required if --s3-bucket or --s3-path is specified.", ) + parser.add_argument( + "--num-segments", + type=int, + nargs="?", + default=None, + help="Number of test segments. Only used in checkpoint mode. The test divides all views in the benchmark into this many groups and adds one group of views per segment to the pipeline.", + ) + + parser.add_argument( + "--segment-size", + type=int, + nargs="?", + default=None, + help="Approximate number of records ingested per segment. Only used in checkpoint mode.", + ) + args = parser.parse_args() if sum(x is not None for x in (args.s3_bucket, args.s3_path, args.input_dir)) > 1: @@ -131,6 +159,7 @@ def run_cli(): elif args.s3_bucket: input_mode = "s3" s3_region = args.s3_region + s3_path = None elif args.s3_path: input_mode = "delta" s3_path = args.s3_path @@ -153,6 +182,8 @@ def run_cli(): s3_region=s3_region, s3_path=s3_path, input_dir=args.input_dir, + segment_size=args.segment_size, + num_segments=args.num_segments, ) tpch_test(config) @@ -1297,24 +1328,34 @@ def tpch_test_segment( pipeline: Pipeline, tables: dict, views: List[ViewSpec], - expected_processed_records, + last_processed: int, segment_size: int, -) -> tuple[bool, int]: + previously_non_empty_views: List[str], +) -> tuple[bool, int, List[str]]: """Run a test segment. - Start the pipeline (from a checkpoint if one exists), run a series of transactions followed by streaming ingest periods, - until the pipeline processed segment_size records. A checkpoint is created halfway through the segment, or at the end. The - pipeline is then paused,outputs validated, and the pipeline stopped. + Start the pipeline (from a checkpoint if one exists), run a transaction to process segment_size records. + A checkpoint is created at the end of the segment. The pipeline is then paused, outputs validated, and + the pipeline stopped. + + previously_non_empty_views is a list of views that were non-empty before the start of the segment. + This is used to check that the views are still non-empty when the pipeline is restarted, i.e., + output snapshots are correctly populated by the bootstrapping process. """ # Start pipeline. start_time = time.monotonic() - log( - f"Starting pipeline to process {segment_size} records, starting from (approximately) {expected_processed_records} processed records" - ) - pipeline.start() + log(f"Starting pipeline to process {segment_size} records") + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) log(f"Pipeline started in {time.monotonic() - start_time} seconds") + for view_name in previously_non_empty_views: + count = view_row_count(pipeline, view_name) + if count == 0: + raise RuntimeError( + f"View {view_name} was non-empty before restart, but is empty after restart" + ) + # Current number of processed records. initial_processed_records = number_of_processed_records(pipeline) @@ -1322,56 +1363,25 @@ def tpch_test_segment( f"Initial processed records at the start of a segment: {initial_processed_records}" ) - if initial_processed_records < expected_processed_records: + if initial_processed_records < last_processed: raise RuntimeError( - f"Expected at least {expected_processed_records} processed records on startup, got {initial_processed_records}" + f"Expected at least {last_processed} processed records on startup, got {initial_processed_records}" ) - # Expected number of processed records after this segment. - expected_processed_records = initial_processed_records + segment_size - - # Make a checkpoint halfway through the segment after processing this many records. - halfway_processed_records = ( - initial_processed_records + expected_processed_records - ) >> 1 - - checkpoint = False + # Transaction + transaction_num_records(pipeline, segment_size) + check_for_endpoint_errors(pipeline) - while not pipeline.is_complete(): - current_processed_records = number_of_processed_records(pipeline) - log( - f"Processed {current_processed_records} total records so far (processed {current_processed_records - initial_processed_records} records in this segment)" - ) + processed_before_checkpoint = number_of_processed_records(pipeline) + non_empty_views = [ + view.name for view in views if view_row_count(pipeline, view.name) > 0 + ] + checkpoint_pipeline(pipeline) - if current_processed_records >= expected_processed_records: - log("Сompleting test segment") - break - - # Transaction - transaction(pipeline, 100) - check_for_endpoint_errors(pipeline) - - # Streaming ingest (no transaction) - log("Running streaming ingest for 10 seconds") - time.sleep(10) - check_for_endpoint_errors(pipeline) - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - - if processed_before_checkpoint >= halfway_processed_records: - log( - f"Creating checkpoint after processing {processed_before_checkpoint} records" - ) - checkpoint_pipeline(pipeline) - checkpoint = True - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - log( - f"Creating checkpoint at the end of the segment after processing {processed_before_checkpoint} records" - ) - checkpoint(pipeline) + # Streaming ingest (no transaction) + log("Running streaming ingest for 10 seconds") + time.sleep(10) + check_for_endpoint_errors(pipeline) pipeline.pause() @@ -1383,7 +1393,14 @@ def tpch_test_segment( log("Stopping pipeline") pipeline.stop(force=True) - return (complete, processed_before_checkpoint) + return (complete, processed_before_checkpoint, non_empty_views) + + +def view_row_count(pipeline: Pipeline, view_name: str) -> int: + escaped_view_name = view_name.replace('"', '""') + return next(pipeline.query(f'SELECT COUNT(*) as cnt FROM "{escaped_view_name}"'))[ + "cnt" + ] def tpch_test(config: TPCHTestConfig): @@ -1401,29 +1418,107 @@ def tpch_test(config: TPCHTestConfig): views = tpch_views(q_dirs) if config.mode == "checkpoint": + log("Starting checkpoint mode with all tables and no views") pipeline = build_pipeline( - unique_pipeline_name("tpc-h-checkpoint"), tables, views + unique_pipeline_name("tpc-h-checkpoint"), + tables, + [], + dev_tweaks={"adaptive_joins": True}, ) + if config.segment_size is not None: + segment_size = config.segment_size + else: + segment_size = 100_000_000 + + if config.num_segments is not None: + views_per_segment = ( + len(views) + config.num_segments - 1 + ) // config.num_segments + else: + views_per_segment = 5 last_processed = 0 - iteration = 1 - modified_views = views - while True: - (complete, last_processed) = tpch_test_segment( - pipeline, tables, modified_views, last_processed, 100000000 + complete = False + non_empty_views: List[str] = [] + + view_counts = list(range(views_per_segment, len(views) + 1, views_per_segment)) + if views and (not view_counts or view_counts[-1] != len(views)): + view_counts.append(len(views)) + + for view_count in view_counts: + modified_views = views[:view_count] + log( + f"Checkpoint view-add phase: adding {view_count}/{len(views)} views: " + f"{', '.join(view.name for view in modified_views)}" + ) + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + log( + f"Running checkpoint segment with {view_count} views from {last_processed} processed records" + ) + + (complete, last_processed, non_empty_views) = tpch_test_segment( + pipeline, + tables, + modified_views, + last_processed, + segment_size, + non_empty_views, + ) + log( + f"Completed checkpoint view-add segment with complete={complete}, last_processed={last_processed}" ) if complete: break - iteration += 1 - modified_views = list( - map( - lambda view: view.clone_with_name(f"{view.name}_{iteration}"), views - ) - ) + # Test a different type of pipeline change: rename all views in the program. + modified_views = [ + view.clone_with_name(f"{view.name}_renamed") for view in views + ] + log(f"Renaming all views: {', '.join(view.name for view in modified_views)}") + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + + # Process remaining data in one transaction. + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) + start_time = time.monotonic() + + try: + pipeline.start_transaction() + except Exception as e: + log(f"Error starting transaction: {e}") + + if config.segment_size is not None: + expected_inputs = number_of_input_records(pipeline) + config.segment_size + while number_of_input_records( + pipeline + ) < expected_inputs and not check_end_of_input(pipeline): + time.sleep(3) + else: + wait_end_of_input(pipeline, timeout_s=3600) - sql = generate_program(tables, modified_views) - pipeline.modify(sql=sql) + elapsed = time.monotonic() - start_time + log(f"Remaining data ingested in {elapsed}") + + start_time = time.monotonic() + try: + pipeline.commit_transaction(transaction_id=None, wait=True, timeout_s=None) + log(f"Commit took {time.monotonic() - start_time}") + except Exception as e: + log(f"Error committing transaction: {e}") + + pipeline.pause() + + # log("Waiting for outputs to flush") + # start_time = time.monotonic() + # pipeline.wait_for_completion(force_stop=False, timeout_s=3600) + # log(f"Flushing outputs took {time.monotonic() - start_time}") + + validate_outputs(pipeline, tables, modified_views) + + pipeline.stop(force=True) elif config.mode == "transaction": pipeline = run_workload(