From e0cbc1c5bda278f818c46dd5969cd3058ae7daa7 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Mon, 11 May 2026 11:54:45 -0700 Subject: [PATCH 1/8] [dbsp] Fix bootstrapping of tables with a PK. This fixes an issue when bootstrapping a table with a PK when there is a downstream operator attached to it that creates an integral of the same table. We ended up with two integrals that can both be used to replay the same stream, one with and one without an accumulator. We used to replay from the last registered replay source, which meant that if the second integral was added in the modified version of the program, it was empty and replay failed, despite the fact that the input integral could be used for replay. To make things worse, we report this error as the input table not being materialized, which is simply wrong. This commit adds a simle workaround that uses the first registered replay source (by refusing to register new replay sources for the same stream) and a regression test for this. Signed-off-by: Leonid Ryzhyk --- crates/dbsp/src/circuit/circuit_builder.rs | 18 +++- crates/dbsp/src/circuit/replay_tests.rs | 104 +++++++++++++++++---- 2 files changed, 101 insertions(+), 21 deletions(-) diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index 619bb2d0a9..27335278a8 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream( // We currently only support using operators in the top-level circuit // as replay sources. if TypeId::of::<()>() == TypeId::of::() { - circuit.cache_insert( - ReplaySource::new(stream.stream_id()), - Box::new(replay_stream.clone()), - ); + // If a replay source already exists, don't overwrite it. This normally shouldn't + // happen as we should not have more than one integral for each stream. One situation + // where this does happen today is for input streams that have an integral without + // an accumulator as part of input_upsert, and another integral with an accumulator + // created by a downstream join or aggregate. In this case, we want to use the former + // for replay, as the latter may have been added in the new version of the program + // and may be empty, while the former can have state (conversely, if the input integral + // is empty, the downstream integral is guaranteed to be empty too). + if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) { + circuit.cache_insert( + ReplaySource::new(stream.stream_id()), + Box::new(replay_stream.clone()), + ); + } } } diff --git a/crates/dbsp/src/circuit/replay_tests.rs b/crates/dbsp/src/circuit/replay_tests.rs index 37cd54f995..3e9abb250e 100644 --- a/crates/dbsp/src/circuit/replay_tests.rs +++ b/crates/dbsp/src/circuit/replay_tests.rs @@ -3,7 +3,7 @@ use feldera_types::config::StorageConfig; use crate::{ CmpFunc, DBData, OrdZSet, OutputHandle, RootCircuit, Runtime, Stream, ZSetHandle, ZWeight, circuit::dbsp_handle::CircuitStorageConfig, - default_hash, + default_hash, indexed_zset, operator::{ Max, Min, time_series::{RelOffset, RelRange}, @@ -186,6 +186,22 @@ fn test_replay( ); } +fn circuit_config(path: &PathBuf) -> CircuitConfig { + CircuitConfig::with_workers(NUM_WORKERS) + .with_splitter_chunk_size_records(2) + .with_mode(Mode::Persistent) + .with_storage(Some( + CircuitStorageConfig::for_config( + StorageConfig { + path: path.to_string_lossy().into_owned(), + cache: Default::default(), + }, + Default::default(), + ) + .unwrap(), + )) +} + fn test_replay_with_step_size( circuit_constructor1: CircuitFn, circuit_constructor2: CircuitFn, @@ -210,22 +226,6 @@ fn test_replay_with_step_size( let path = tempfile::tempdir().unwrap().keep(); println!("Running replay_test in {}", path.display()); - fn circuit_config(path: &PathBuf) -> CircuitConfig { - CircuitConfig::with_workers(NUM_WORKERS) - .with_splitter_chunk_size_records(2) - .with_mode(Mode::Persistent) - .with_storage(Some( - CircuitStorageConfig::for_config( - StorageConfig { - path: path.to_string_lossy().into_owned(), - cache: Default::default(), - }, - Default::default(), - ) - .unwrap(), - )) - } - // Create both reference circuits, feed I1 and I2 to circuit1; feed I2 and I3 to circuit2. let mut reference_output1 = Vec::new(); let mut reference_output2 = Vec::new(); @@ -1690,3 +1690,73 @@ fn test_rolling_circuit() { std::iter::repeat_n((), 20).collect(), ); } + +// Regression test: +// +// Pipeline 1: +// ---> input_map +// +// Pipeline 2: +// ---> input_map ---> aggregate --> output +// +// The second pipeline should replay the input from the input_map operator. +// A bug prevented this from happening, because the integral built by the +// aggregate operator was used to replay instead. + +#[test] +fn regression1() { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + + let (mut circuit1, input_handle1) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + Ok(input_handle) + }) + .unwrap(); + + input_handle1.push(0, crate::operator::Update::Insert(0)); + + circuit1.transaction().unwrap(); + + // Checkpoint. + let checkpoint = circuit1.checkpoint().run().unwrap(); + circuit1.kill().unwrap(); + + // Restart the second circuit from the checkpoint. + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit2, (_input_handle2, output_handle2)) = + Runtime::init_circuit(circuit_config, move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + + //let input_stream = input_stream.apply_owned(|x| x).set_persistent_id(Some("input_map2")); + + let aggregate = input_stream + .aggregate_persistent(Some("aggregate1"), Max) + .set_persistent_id(Some("aggregate1")); + + let output_handle = aggregate + .accumulate_trace() + .apply(|trace| trace.ro_snapshot().consolidate()) + .output_persistent(Some("output")); + Ok((input_handle, output_handle)) + }) + .unwrap(); + + while circuit2.bootstrap_in_progress() { + circuit2.transaction().unwrap(); + } + println!("Replay finished"); + + //let output = output_handle2.take_from_all().concat().consolidate(); + let actual_output = &output_handle2.concat().consolidate(); + + assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1})); +} From 4f1071e7ec74c0ddbfcb86002adbdfff763fc593 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Mon, 11 May 2026 16:29:41 -0700 Subject: [PATCH 2/8] [dbsp] Transactional bootstrapping. Fixes #4736 Bootstrapping used to be performed in a sequence of transactions. Depending on the program this could be inefficient due to redundant recomputation. In addition this produced multiple small batches of potentialy mutually canceling outputs. We now have all the infra needed to change this. This commit changes Z1Trace and AccumulateZ1Trace operator to behave as splitters, i.e., they replay their entire contents across multiple steps within the same transaction. We also get rid of the replay_step_size knob. Instead we use the existing splitter_chunk_size setting, which controls the number of records produced by splitters per step. Signed-off-by: Leonid Ryzhyk --- crates/dbsp/src/circuit/dbsp_handle.rs | 14 -- crates/dbsp/src/circuit/replay_tests.rs | 232 ++++++++++++++---- crates/dbsp/src/circuit/runtime.rs | 28 --- .../src/operator/dynamic/accumulate_trace.rs | 10 +- crates/dbsp/src/operator/dynamic/trace.rs | 10 +- 5 files changed, 197 insertions(+), 97 deletions(-) diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 42a277fb32..2ae439dfd8 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -1397,20 +1397,6 @@ impl DBSPHandle { Ok(progress) } - pub fn set_replay_step_size(&mut self, step_size: usize) { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().set_replay_step_size(step_size); - } - } - - pub fn get_replay_step_size(&self) -> usize { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().get_replay_step_size() - } else { - 0 - } - } - /// The circuit has been resumed from a checkpoint and is currently bootstrapping the modified part of the circuit. pub fn bootstrap_in_progress(&self) -> bool { self.bootstrap_info.is_some() diff --git a/crates/dbsp/src/circuit/replay_tests.rs b/crates/dbsp/src/circuit/replay_tests.rs index 3e9abb250e..a230329c47 100644 --- a/crates/dbsp/src/circuit/replay_tests.rs +++ b/crates/dbsp/src/circuit/replay_tests.rs @@ -1,7 +1,8 @@ use feldera_types::config::StorageConfig; use crate::{ - CmpFunc, DBData, OrdZSet, OutputHandle, RootCircuit, Runtime, Stream, ZSetHandle, ZWeight, + CmpFunc, DBData, IndexedZSetHandle, OrdIndexedZSet, OrdZSet, OutputHandle, RootCircuit, + Runtime, Stream, ZSetHandle, ZWeight, circuit::dbsp_handle::CircuitStorageConfig, default_hash, indexed_zset, operator::{ @@ -148,43 +149,6 @@ type CircuitFn = Arc< ///``` /// /// The common part of the two circuits must return identical results. -fn test_replay( - circuit_constructor1: CircuitFn, - circuit_constructor2: CircuitFn, - inputs1: Vec, - inputs2_1: Vec, - inputs2_2: Vec, - inputs3: Vec, -) where - I1: TestDataType, - I2: TestDataType, - I3: TestDataType, - O1: TestDataType, - O2: TestDataType, - O3: TestDataType, -{ - // Run with replay step size < splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1.clone(), - circuit_constructor2.clone(), - inputs1.clone(), - inputs2_1.clone(), - inputs2_2.clone(), - inputs3.clone(), - Some(1), - ); - - // Run without replay step size > splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1, - circuit_constructor2, - inputs1, - inputs2_1, - inputs2_2, - inputs3, - None, - ); -} fn circuit_config(path: &PathBuf) -> CircuitConfig { CircuitConfig::with_workers(NUM_WORKERS) @@ -202,14 +166,13 @@ fn circuit_config(path: &PathBuf) -> CircuitConfig { )) } -fn test_replay_with_step_size( +fn test_replay( circuit_constructor1: CircuitFn, circuit_constructor2: CircuitFn, inputs1: Vec, inputs2_1: Vec, inputs2_2: Vec, inputs3: Vec, - replay_step_size: Option, ) where I1: TestDataType, I2: TestDataType, @@ -343,10 +306,6 @@ fn test_replay_with_step_size( }) .unwrap(); - if let Some(replay_step_size) = replay_step_size { - circuit.set_replay_step_size(replay_step_size); - } - while circuit.bootstrap_in_progress() { circuit.transaction().unwrap(); } @@ -1736,8 +1695,6 @@ fn regression1() { .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); input_stream.set_persistent_id(Some("input_map")); - //let input_stream = input_stream.apply_owned(|x| x).set_persistent_id(Some("input_map2")); - let aggregate = input_stream .aggregate_persistent(Some("aggregate1"), Max) .set_persistent_id(Some("aggregate1")); @@ -1755,8 +1712,189 @@ fn regression1() { } println!("Replay finished"); - //let output = output_handle2.take_from_all().concat().consolidate(); let actual_output = &output_handle2.concat().consolidate(); + // The bug causes the output to be empty. assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1})); } + +/// Unit test for the replay behavior of Z1Trace and AccumulateZ1Trace operators. +/// Operators must correctly replay their contents during bootstrap as one atomic transaction. + +#[derive(Clone, Copy, Debug)] +enum ReplayTraceKind { + IntegrateTrace, + AccumulateTrace, +} + +type IndexedReplayBatch = Vec>>; + +fn add_replay_trace( + stream: &Stream>, + trace_kind: ReplayTraceKind, +) { + match trace_kind { + ReplayTraceKind::IntegrateTrace => { + stream.integrate_trace(); + } + ReplayTraceKind::AccumulateTrace => { + stream.accumulate_trace(); + } + } +} + +fn transactional_bootstrap_circuit1( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> IndexedZSetHandle { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + input_handle +} + +fn transactional_bootstrap_circuit2( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> ( + IndexedZSetHandle, + OutputHandle>>, +) { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + + let output_handle = input_stream.accumulate_output_persistent(Some("output")); + + (input_handle, output_handle) +} + +fn replay_batch_to_indexed_zset(batches: &[IndexedReplayBatch]) -> OrdIndexedZSet { + OrdIndexedZSet::from_tuples( + (), + batches + .iter() + .flatten() + .map(|Tup2(key, Tup2(value, weight))| Tup2(Tup2(*key, *value), *weight)) + .collect(), + ) +} + +fn run_transactional_bootstrap_test( + trace_kind: ReplayTraceKind, + batches: Vec, + expect_multistep_replay: bool, +) { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + let expected = replay_batch_to_indexed_zset(&batches); + + let checkpoint = { + let (mut circuit, input_handle) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + Ok(transactional_bootstrap_circuit1(circuit, trace_kind)) + }) + .unwrap(); + + for mut batch in batches.clone() { + input_handle.append(&mut batch); + circuit.transaction().unwrap(); + } + + let checkpoint = circuit.checkpoint().run().unwrap(); + circuit.kill().unwrap(); + checkpoint + }; + + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit, (_input_handle, output_handle)) = + Runtime::init_circuit(circuit_config, move |circuit| { + Ok(transactional_bootstrap_circuit2(circuit, trace_kind)) + }) + .unwrap(); + + assert_eq!(output_handle.num_nonempty_mailboxes(), 0); + + if circuit.bootstrap_in_progress() { + circuit.start_transaction().unwrap(); + circuit.start_commit_transaction().unwrap(); + + let mut incomplete_commit_steps = 0; + loop { + let commit_complete = circuit.step().unwrap(); + if commit_complete { + break; + } + + incomplete_commit_steps += 1; + } + + if expect_multistep_replay { + assert!( + incomplete_commit_steps > 0, + "{trace_kind:?} replay finished in a single commit step despite the splitter chunk size" + ); + } + } + + assert!(!circuit.bootstrap_in_progress()); + assert_eq!(output_handle.concat().consolidate(), expected); + + circuit.kill().unwrap(); +} + +fn transactional_bootstrap_cases() -> Vec<(Vec, bool)> { + vec![ + (vec![], false), + (vec![vec![Tup2(1, Tup2(10, 1))]], false), + ( + vec![vec![Tup2(1, Tup2(10, 1)), Tup2(1, Tup2(11, 1))]], + false, + ), + ( + vec![ + vec![ + Tup2(1, Tup2(10, 1)), + Tup2(1, Tup2(11, 1)), + Tup2(2, Tup2(20, 1)), + ], + vec![ + Tup2(1, Tup2(11, -1)), + Tup2(1, Tup2(12, 1)), + Tup2(4, Tup2(40, 2)), + Tup2(5, Tup2(50, 2)), + Tup2(6, Tup2(50, 2)), + Tup2(7, Tup2(50, 2)), + Tup2(8, Tup2(50, 2)), + Tup2(9, Tup2(50, 2)), + ], + ], + true, + ), + ] +} + +#[test] +fn test_integrate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::IntegrateTrace, + batches, + expect_multistep_replay, + ); + } +} + +#[test] +fn test_accumulate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::AccumulateTrace, + batches, + expect_multistep_replay, + ); + } +} diff --git a/crates/dbsp/src/circuit/runtime.rs b/crates/dbsp/src/circuit/runtime.rs index 33b760126a..37cfdba62b 100644 --- a/crates/dbsp/src/circuit/runtime.rs +++ b/crates/dbsp/src/circuit/runtime.rs @@ -64,9 +64,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use typedmap::TypedDashMap; -/// The number of tuples a stateful operator outputs per step during replay. -pub const DEFAULT_REPLAY_STEP_SIZE: usize = 10000; - #[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub enum Error { /// Specified persistent id is not found in the circuit. @@ -304,7 +301,6 @@ struct RuntimeInner { /// Panic info collected from failed worker threads. panic_info: Vec>>>, panicked: AtomicBool, - replay_step_size: AtomicUsize, /// Tokio runtime that runs async merger tasks (see `AsyncMerger`). tokio_merger_runtime: Mutex>, @@ -511,7 +507,6 @@ impl RuntimeInner { .map(|_| EnumMap::from_fn(|_| RwLock::new(None))) .collect(), panicked: AtomicBool::new(false), - replay_step_size: AtomicUsize::new(DEFAULT_REPLAY_STEP_SIZE), tokio_merger_runtime: Mutex::new(None), exchange_listener: Mutex::new(config.exchange_listener), }) @@ -1063,29 +1058,6 @@ impl Runtime { self.inner().step_size } - /// Configure the number of tuples a stateful operator outputs per step during replay. - /// - /// The default is `DEFAULT_REPLAY_STEP_SIZE`. - pub fn set_replay_step_size(&self, step_size: usize) { - self.inner() - .replay_step_size - .store(step_size, Ordering::Release); - } - - /// Get currently configured replay step size. - /// - /// Returns `DEFAULT_REPLAY_STEP_SIZE` if the current thread doesn't have a runtime. - pub fn replay_step_size() -> usize { - RUNTIME - .with(|rt| Some(rt.borrow().as_ref()?.get_replay_step_size())) - .unwrap_or(DEFAULT_REPLAY_STEP_SIZE) - } - - /// Get currently configured replay step size. - pub fn get_replay_step_size(&self) -> usize { - self.inner().replay_step_size.load(Ordering::Acquire) - } - /// Returns the worker index as a string. /// /// This is useful for metric labels. diff --git a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs index 829be69c11..0fef3ebbf6 100644 --- a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs +++ b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::dynamic::trace::{DelayedTraceId, TraceBounds}; use crate::operator::{TraceBound, require_persistent_id}; @@ -1117,7 +1117,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1161,6 +1161,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1169,11 +1170,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result diff --git a/crates/dbsp/src/operator/dynamic/trace.rs b/crates/dbsp/src/operator/dynamic/trace.rs index 2d90686a3f..85270e1463 100644 --- a/crates/dbsp/src/operator/dynamic/trace.rs +++ b/crates/dbsp/src/operator/dynamic/trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::require_persistent_id; use crate::trace::spine_async::WithSnapshot; @@ -1184,7 +1184,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1228,6 +1228,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1236,11 +1237,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result From 941e099443cd500f8d0ad5d565b3c6634413120a Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Mon, 11 May 2026 17:07:25 -0700 Subject: [PATCH 3/8] [qa] testutils.py: support dev_tweaks. Signed-off-by: Leonid Ryzhyk --- python/feldera/testutils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/feldera/testutils.py b/python/feldera/testutils.py index 1f4fd45150..3324a40f22 100644 --- a/python/feldera/testutils.py +++ b/python/feldera/testutils.py @@ -264,6 +264,7 @@ def build_pipeline( tables: dict, views: List[ViewSpec], resources: Optional[Resources] = None, + dev_tweaks: Optional[dict] = None, ) -> Pipeline: sql = generate_program(tables, views) @@ -277,6 +278,7 @@ def build_pipeline( resources=resources, workers=FELDERA_TEST_NUM_WORKERS, hosts=FELDERA_TEST_NUM_HOSTS, + dev_tweaks=dev_tweaks, ), ).create_or_replace() From 15dace0720ad95a057c9d456e5e5f2231598d4ff Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Thu, 7 May 2026 11:26:26 -0700 Subject: [PATCH 4/8] [qa] Improved TPC-H test. Improve TPC-H test in `checkpoint` mode, which can be used to torture-test bootstrapping: - Configurable number and size of test segments. With these new options we can scale the test up and down using the same dataset (typically TPC-H). - Check that views are initialized after bootstrapping. Example: Split views into 2 groups 10M records each. ``` uv run test_tpch.py --s3-bucket feldera-qa-data --s3-prefix tpc-h-100 --s3-region us-west-1 --mode checkpoint --num-segments 2 --segment-size 10000000 ``` Signed-off-by: Leonid Ryzhyk --- python/feldera/testutils.py | 25 +++ python/tests/workloads/test_tpch.py | 235 +++++++++++++++++++--------- 2 files changed, 190 insertions(+), 70 deletions(-) diff --git a/python/feldera/testutils.py b/python/feldera/testutils.py index 3324a40f22..ba8d4b5c77 100644 --- a/python/feldera/testutils.py +++ b/python/feldera/testutils.py @@ -321,6 +321,25 @@ def transaction(pipeline: Pipeline, duration_seconds: int): log(f"Transaction committed in {time.monotonic() - commit_start} seconds") +def transaction_num_records(pipeline: Pipeline, num_records: int): + """Run a transaction until it ingests a record count or reaches end of input.""" + + log(f"Running transaction for {num_records} records or end of input") + initial_records = number_of_processed_records(pipeline) + pipeline.start_transaction() + + while not check_end_of_input(pipeline): + processed_records = number_of_processed_records(pipeline) - initial_records + if processed_records >= num_records: + break + time.sleep(3) + + log("Committing transaction") + commit_start = time.monotonic() + pipeline.commit_transaction() + log(f"Transaction committed in {time.monotonic() - commit_start} seconds") + + def checkpoint_pipeline(pipeline: Pipeline): """Create a checkpoint and wait for it to complete.""" @@ -364,6 +383,12 @@ def number_of_processed_records(pipeline: Pipeline) -> int: return pipeline.stats().global_metrics.total_processed_records +def number_of_input_records(pipeline: Pipeline) -> int: + """Get the total_input_records metric.""" + + return pipeline.stats().global_metrics.total_input_records + + def run_workload( pipeline_name: str, tables: dict, diff --git a/python/tests/workloads/test_tpch.py b/python/tests/workloads/test_tpch.py index d68d3dccc7..2d4bc5ec46 100644 --- a/python/tests/workloads/test_tpch.py +++ b/python/tests/workloads/test_tpch.py @@ -1,20 +1,24 @@ import sys import time import unittest +from feldera.enums import BootstrapPolicy from feldera.pipeline import Pipeline from feldera.testutils import ( IndexSpec, ViewSpec, build_pipeline, check_for_endpoint_errors, + check_end_of_input, checkpoint_pipeline, generate_program, log, number_of_processed_records, + number_of_input_records, run_workload, - transaction, + transaction_num_records, unique_pipeline_name, validate_outputs, + wait_end_of_input, ) import tempfile import os @@ -32,6 +36,8 @@ def __init__( s3_path: Optional[str] = None, s3_region: Optional[str] = None, input_dir: Optional[str] = None, + segment_size: Optional[int] = None, + num_segments: Optional[int] = None, ): self.mode = mode @@ -39,6 +45,12 @@ def __init__( raise ValueError(f"Unknown mode: {mode}") self.input_mode = input_mode + self.segment_size = segment_size + self.num_segments = num_segments + + if self.num_segments is not None: + if self.num_segments <= 0 or self.num_segments > 20: + raise ValueError("num_segments must be between 1 and 20") if self.input_mode == "file": self.input_dir = input_dir @@ -117,6 +129,22 @@ def run_cli(): help="S3 bucket region. Required if --s3-bucket or --s3-path is specified.", ) + parser.add_argument( + "--num-segments", + type=int, + nargs="?", + default=None, + help="Number of test segments. Only used in checkpoint mode. The test divides all views in the benchmark into this many groups and adds one group of views per segment to the pipeline.", + ) + + parser.add_argument( + "--segment-size", + type=int, + nargs="?", + default=None, + help="Approximate number of records ingested per segment. Only used in checkpoint mode.", + ) + args = parser.parse_args() if sum(x is not None for x in (args.s3_bucket, args.s3_path, args.input_dir)) > 1: @@ -131,6 +159,7 @@ def run_cli(): elif args.s3_bucket: input_mode = "s3" s3_region = args.s3_region + s3_path = None elif args.s3_path: input_mode = "delta" s3_path = args.s3_path @@ -153,6 +182,8 @@ def run_cli(): s3_region=s3_region, s3_path=s3_path, input_dir=args.input_dir, + segment_size=args.segment_size, + num_segments=args.num_segments, ) tpch_test(config) @@ -1297,24 +1328,34 @@ def tpch_test_segment( pipeline: Pipeline, tables: dict, views: List[ViewSpec], - expected_processed_records, + last_processed: int, segment_size: int, -) -> tuple[bool, int]: + previously_non_empty_views: List[str], +) -> tuple[bool, int, List[str]]: """Run a test segment. - Start the pipeline (from a checkpoint if one exists), run a series of transactions followed by streaming ingest periods, - until the pipeline processed segment_size records. A checkpoint is created halfway through the segment, or at the end. The - pipeline is then paused,outputs validated, and the pipeline stopped. + Start the pipeline (from a checkpoint if one exists), run a transaction to process segment_size records. + A checkpoint is created at the end of the segment. The pipeline is then paused, outputs validated, and + the pipeline stopped. + + previously_non_empty_views is a list of views that were non-empty before the start of the segment. + This is used to check that the views are still non-empty when the pipeline is restarted, i.e., + output snapshots are correctly populated by the bootstrapping process. """ # Start pipeline. start_time = time.monotonic() - log( - f"Starting pipeline to process {segment_size} records, starting from (approximately) {expected_processed_records} processed records" - ) - pipeline.start() + log(f"Starting pipeline to process {segment_size} records") + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) log(f"Pipeline started in {time.monotonic() - start_time} seconds") + for view_name in previously_non_empty_views: + count = view_row_count(pipeline, view_name) + if count == 0: + raise RuntimeError( + f"View {view_name} was non-empty before restart, but is empty after restart" + ) + # Current number of processed records. initial_processed_records = number_of_processed_records(pipeline) @@ -1322,56 +1363,25 @@ def tpch_test_segment( f"Initial processed records at the start of a segment: {initial_processed_records}" ) - if initial_processed_records < expected_processed_records: + if initial_processed_records < last_processed: raise RuntimeError( - f"Expected at least {expected_processed_records} processed records on startup, got {initial_processed_records}" + f"Expected at least {last_processed} processed records on startup, got {initial_processed_records}" ) - # Expected number of processed records after this segment. - expected_processed_records = initial_processed_records + segment_size - - # Make a checkpoint halfway through the segment after processing this many records. - halfway_processed_records = ( - initial_processed_records + expected_processed_records - ) >> 1 - - checkpoint = False + # Transaction + transaction_num_records(pipeline, segment_size) + check_for_endpoint_errors(pipeline) - while not pipeline.is_complete(): - current_processed_records = number_of_processed_records(pipeline) - log( - f"Processed {current_processed_records} total records so far (processed {current_processed_records - initial_processed_records} records in this segment)" - ) + processed_before_checkpoint = number_of_processed_records(pipeline) + non_empty_views = [ + view.name for view in views if view_row_count(pipeline, view.name) > 0 + ] + checkpoint_pipeline(pipeline) - if current_processed_records >= expected_processed_records: - log("Сompleting test segment") - break - - # Transaction - transaction(pipeline, 100) - check_for_endpoint_errors(pipeline) - - # Streaming ingest (no transaction) - log("Running streaming ingest for 10 seconds") - time.sleep(10) - check_for_endpoint_errors(pipeline) - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - - if processed_before_checkpoint >= halfway_processed_records: - log( - f"Creating checkpoint after processing {processed_before_checkpoint} records" - ) - checkpoint_pipeline(pipeline) - checkpoint = True - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - log( - f"Creating checkpoint at the end of the segment after processing {processed_before_checkpoint} records" - ) - checkpoint(pipeline) + # Streaming ingest (no transaction) + log("Running streaming ingest for 10 seconds") + time.sleep(10) + check_for_endpoint_errors(pipeline) pipeline.pause() @@ -1383,7 +1393,14 @@ def tpch_test_segment( log("Stopping pipeline") pipeline.stop(force=True) - return (complete, processed_before_checkpoint) + return (complete, processed_before_checkpoint, non_empty_views) + + +def view_row_count(pipeline: Pipeline, view_name: str) -> int: + escaped_view_name = view_name.replace('"', '""') + return next(pipeline.query(f'SELECT COUNT(*) as cnt FROM "{escaped_view_name}"'))[ + "cnt" + ] def tpch_test(config: TPCHTestConfig): @@ -1401,29 +1418,107 @@ def tpch_test(config: TPCHTestConfig): views = tpch_views(q_dirs) if config.mode == "checkpoint": + log("Starting checkpoint mode with all tables and no views") pipeline = build_pipeline( - unique_pipeline_name("tpc-h-checkpoint"), tables, views + unique_pipeline_name("tpc-h-checkpoint"), + tables, + [], + dev_tweaks={"adaptive_joins": True}, ) + if config.segment_size is not None: + segment_size = config.segment_size + else: + segment_size = 100_000_000 + + if config.num_segments is not None: + views_per_segment = ( + len(views) + config.num_segments - 1 + ) // config.num_segments + else: + views_per_segment = 5 last_processed = 0 - iteration = 1 - modified_views = views - while True: - (complete, last_processed) = tpch_test_segment( - pipeline, tables, modified_views, last_processed, 100000000 + complete = False + non_empty_views: List[str] = [] + + view_counts = list(range(views_per_segment, len(views) + 1, views_per_segment)) + if views and (not view_counts or view_counts[-1] != len(views)): + view_counts.append(len(views)) + + for view_count in view_counts: + modified_views = views[:view_count] + log( + f"Checkpoint view-add phase: adding {view_count}/{len(views)} views: " + f"{', '.join(view.name for view in modified_views)}" + ) + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + log( + f"Running checkpoint segment with {view_count} views from {last_processed} processed records" + ) + + (complete, last_processed, non_empty_views) = tpch_test_segment( + pipeline, + tables, + modified_views, + last_processed, + segment_size, + non_empty_views, + ) + log( + f"Completed checkpoint view-add segment with complete={complete}, last_processed={last_processed}" ) if complete: break - iteration += 1 - modified_views = list( - map( - lambda view: view.clone_with_name(f"{view.name}_{iteration}"), views - ) - ) + # Test a different type of pipeline change: rename all views in the program. + modified_views = [ + view.clone_with_name(f"{view.name}_renamed") for view in views + ] + log(f"Renaming all views: {', '.join(view.name for view in modified_views)}") + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + + # Process remaining data in one transaction. + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) + start_time = time.monotonic() + + try: + pipeline.start_transaction() + except Exception as e: + log(f"Error starting transaction: {e}") + + if config.segment_size is not None: + expected_inputs = number_of_input_records(pipeline) + config.segment_size + while number_of_input_records( + pipeline + ) < expected_inputs and not check_end_of_input(pipeline): + time.sleep(3) + else: + wait_end_of_input(pipeline, timeout_s=3600) - sql = generate_program(tables, modified_views) - pipeline.modify(sql=sql) + elapsed = time.monotonic() - start_time + log(f"Remaining data ingested in {elapsed}") + + start_time = time.monotonic() + try: + pipeline.commit_transaction(transaction_id=None, wait=True, timeout_s=None) + log(f"Commit took {time.monotonic() - start_time}") + except Exception as e: + log(f"Error committing transaction: {e}") + + pipeline.pause() + + # log("Waiting for outputs to flush") + # start_time = time.monotonic() + # pipeline.wait_for_completion(force_stop=False, timeout_s=3600) + # log(f"Flushing outputs took {time.monotonic() - start_time}") + + validate_outputs(pipeline, tables, modified_views) + + pipeline.stop(force=True) elif config.mode == "transaction": pipeline = run_workload( From 89f8276bd2b28bbc46cb85f8891afed5e5123b3e Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Tue, 12 May 2026 12:15:00 -0700 Subject: [PATCH 5/8] [dbsp] Simplify transaction state tracking in scheduler. Remove redundant TransactionPhase::Idle status. Signed-off-by: Leonid Ryzhyk --- crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs index d631e57752..75581b4122 100644 --- a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs +++ b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs @@ -147,9 +147,6 @@ impl Notifications { } enum TransactionPhase { - /// Not started - Idle, - /// Started, but not yet committing. Started, @@ -360,7 +357,7 @@ impl Inner { notifications: Notifications::new(num_async_nodes), handles: JoinSet::new(), waiting: false, - transaction_phase: TransactionPhase::Idle, + transaction_phase: TransactionPhase::CommitComplete, global_commit_consensus: Broadcast::new(), metadata_broadcast: Broadcast::new(), before_first_step: true, From 9d6322a7b8dae54ffe5063b595ae7dfef154c662 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Tue, 12 May 2026 12:17:55 -0700 Subject: [PATCH 6/8] [dbsp] Fix CircuitHandle::is_replay_complete. Bootstrapping is considered completed when all replay sources are complete and the bootstrapping transaction has committed. The latter condition was missing. This did not cause any issues because we only called this function between transactions. Signed-off-by: Leonid Ryzhyk --- crates/dbsp/src/circuit/circuit_builder.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index 27335278a8..be11379c03 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -7607,10 +7607,16 @@ impl CircuitHandle { return true; }; - replay_info.replay_sources.keys().all(|node_id| { + // Bootstrapping is finished when all replay sources have completed their replay and the + // transaction has been committed. + + let all_complete = replay_info.replay_sources.keys().all(|node_id| { self.circuit .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete()) - }) + }); + + all_complete && self.is_commit_complete() + } /// Finalize the replay phase of the circuit. From 555c33c669a579d1a5c8a009ea28780795241a15 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Tue, 12 May 2026 12:22:40 -0700 Subject: [PATCH 7/8] [adapters] Update output snapshots before completing bootstrapping. Fix #6091. Views that don't participate in bootstrapping don't update their snapshots until the first post-bootstrap transaction. As a result, a client making ad hoc queries right after bootstrapping completes could observe empty views. We fix this by: 1. Forcing an extra transaction after bootstrapping completes (this was already the case). 2. Maintaining `bootstrap_in_progress` status until the extra transaction commits. Signed-off-by: Leonid Ryzhyk --- crates/adapters/src/controller.rs | 68 +++++++++++----------- crates/dbsp/src/circuit/circuit_builder.rs | 1 - 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 5143cf25f6..ab447b1274 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -2491,6 +2491,10 @@ struct CircuitThread { input_metadata: HashMap>, commit_updates: Option, + + /// Set to true on startup if the circuit requires bootstrapping. + /// Cleared when the circuit completes bootstrapping. + bootstrapping: bool, } struct CommitUpdates { @@ -2759,9 +2763,9 @@ impl CircuitThread { incarnation_uuid, )?; - controller - .status - .set_bootstrap_in_progress(circuit.bootstrap_in_progress()); + let bootstrapping = circuit.bootstrap_in_progress(); + + controller.status.set_bootstrap_in_progress(bootstrapping); let input_metadata = input_metadata.map(|input_metadata| { input_metadata @@ -2777,7 +2781,7 @@ impl CircuitThread { // The pipeline hasn't changed based on input and output persistent id values, // yet the circuit is bootstrapping. This is a bug. - if can_replay && circuit.bootstrap_in_progress() { + if can_replay && bootstrapping { return Err(ControllerError::UnexpectedBootstrap { bootstrap_info: circuit.bootstrap_info().clone(), }); @@ -2798,10 +2802,10 @@ impl CircuitThread { }?; // The above code ensures that replay and bootstrapping cannot happen at the same time. - assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress())); + assert!(!(ft.is_replaying() && bootstrapping)); // Disable journaling while we're bootstrapping the circuit. - if circuit.bootstrap_in_progress() { + if bootstrapping { ft.disable(); } @@ -2829,6 +2833,7 @@ impl CircuitThread { checkpoint_sender, input_metadata: input_metadata.unwrap_or_default(), commit_updates: None, + bootstrapping, }) } @@ -2857,10 +2862,7 @@ impl CircuitThread { // so that if the first step() we perform below before entering the loop // ends up finishing bootstrapping, we will still perform an extra step to initialize // the output table snapshots inside the loop. - let mut trigger = StepTrigger::new( - self.controller.clone(), - self.circuit.bootstrap_in_progress(), - ); + let mut trigger = StepTrigger::new(self.controller.clone()); if config.global.cpu_profiler { self.circuit.enable_cpu_profiler().unwrap_or_else(|e| { error!("Failed to enable CPU profiler: {e}"); @@ -2875,7 +2877,7 @@ impl CircuitThread { // // Skip this during bootstrap to avoid a slow first step. We don't guarantee // that view snapshots are up-to-date until bootstrap is complete. - if !self.circuit.bootstrap_in_progress() + if !self.bootstrapping && let Err(error) = self.step() { let _ = init_status_sender.send(Err(error)); @@ -2949,7 +2951,11 @@ impl CircuitThread { self.last_checkpoint(), self.last_checkpoint_sync(), self.replaying(), - self.circuit.bootstrap_in_progress(), + // `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete, + // which is required to initialize the output snapshots. + // We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress` + // rather than `self.bootstrapping` here. + self.controller.status.bootstrap_in_progress(), self.checkpoint_requested(), self.sync_checkpoint_requested(), coordination_request, @@ -3022,11 +3028,6 @@ impl CircuitThread { transaction_state.into_coordination_status(), )); - // If bootstrapping has completed, update the status flag. - self.controller - .status - .set_bootstrap_in_progress(self.circuit.bootstrap_in_progress()); - // Update `trace_snapshot` to the latest traces. // // We do this before updating `total_processed_records` so that ad hoc @@ -3038,6 +3039,20 @@ impl CircuitThread { .with_category("Step") .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) .in_scope(|| self.update_snapshot()); + + let bootstrapping = self.circuit.bootstrap_in_progress(); + + // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag + // until the circuit performs an extra transaction to initialize output snapshots + // (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()` + // is true). + if self.bootstrapping && !bootstrapping { + self.bootstrapping = false; + } else { + self.controller + .status + .set_bootstrap_in_progress(bootstrapping); + } } // Record that we've processed the records, unless there is a transaction in progress, @@ -4243,10 +4258,6 @@ struct StepTrigger { /// Time between automatic checkpoint syncs. sync_interval: Option, - - /// The circuit is bootstrapping. Used to detect the transition from bootstrapping - /// to normal mode. - bootstrapping: bool, } /// Action for the controller to take. @@ -4267,7 +4278,7 @@ enum Action { impl StepTrigger { /// Returns a new [StepTrigger]. - fn new(controller: Arc, bootstrapping: bool) -> Self { + fn new(controller: Arc) -> Self { let config = &controller.status.pipeline_config.global; let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs); let min_batch_size_records = config.min_batch_size_records; @@ -4287,7 +4298,6 @@ impl StepTrigger { max_buffering_delay, min_batch_size_records, checkpoint_interval, - bootstrapping, sync_interval, } } @@ -4349,16 +4359,7 @@ impl StepTrigger { } _ => Some(Action::Park(None)), } - } else if replaying - || self.controller.transaction_commit_requested() - || bootstrapping - || self.bootstrapping - { - // The `self.bootstrapping` condition above detects a transition - // from bootstrapping to normal operation and makes sure that the - // circuit performs an extra step in the normal mode in order to - // initialize output table snapshots of output relations that did - // not participate in bootstrapping. + } else if replaying || self.controller.transaction_commit_requested() || bootstrapping { Some(Action::Step) } else if timer_expired(next_checkpoint, now) && !checkpoint_requested { Some(Action::Checkpoint) @@ -4425,7 +4426,6 @@ impl StepTrigger { } }; - self.bootstrapping = bootstrapping; if result == Action::Step { self.buffer_timeout = None; } diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index be11379c03..97a67cbd18 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -7616,7 +7616,6 @@ impl CircuitHandle { }); all_complete && self.is_commit_complete() - } /// Finalize the replay phase of the circuit. From dffc5b149f3045c4e104fa7bd329de269d119db4 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Thu, 14 May 2026 17:07:42 -0700 Subject: [PATCH 8/8] [adapters] Don't initiailize snapshots until bootstrapping completes. Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction). This guarantees that: 1. Ad hoc queries observe a consistent snapshot of all views. 2. Connectors configured with `send_snapshot=true` don't receive empty . Extended the Python output snapshot test with an extra step that fails without this fix. Signed-off-by: Leonid Ryzhyk --- crates/adapters/src/controller.rs | 14 ++- python/tests/runtime/test_output_snapshot.py | 102 ++++++++++++++++++- 2 files changed, 111 insertions(+), 5 deletions(-) diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index ab447b1274..2c97597b64 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -3035,10 +3035,16 @@ impl CircuitThread { // processing; otherwise, there is a race for any code that runs a query // as soon as input has been processed. if transaction_state == TransactionState::None { - Span::new("update") - .with_category("Step") - .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) - .in_scope(|| self.update_snapshot()); + // Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction). + // This guarantees that: + // 1. Ad hoc queries observe a consistent view of the data. + // 2. Ad hoc snapshots are up-to-date before the pipeline is marked as running. + if !self.controller.status.bootstrap_in_progress() { + Span::new("update") + .with_category("Step") + .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) + .in_scope(|| self.update_snapshot()); + } let bootstrapping = self.circuit.bootstrap_in_progress(); diff --git a/python/tests/runtime/test_output_snapshot.py b/python/tests/runtime/test_output_snapshot.py index 4a4d921508..826f627041 100644 --- a/python/tests/runtime/test_output_snapshot.py +++ b/python/tests/runtime/test_output_snapshot.py @@ -120,11 +120,12 @@ def _delta_connector( location: DeltaTestLocation, *, send_snapshot: bool, + name: str = "delta_out", index: str | None = None, ) -> dict: """Build a `delta_table_output` connector config.""" connector: dict = { - "name": "delta_out", + "name": name, "send_snapshot": send_snapshot, "transport": { "name": "delta_table_output", @@ -185,6 +186,58 @@ def _build_sql(locations: list[DeltaTestLocation], send_snapshot: bool) -> str: return "\n\n".join(parts) +def _build_sql_round3( + locations: list[DeltaTestLocation], + delta1_location: DeltaTestLocation, + delta2_location: DeltaTestLocation, + delta3_location: DeltaTestLocation, +) -> str: + """Render SQL for round 3. + + The original connectors remain unchanged with ``send_snapshot=true``. + The new view forces bootstrapping. ``delta2`` and ``delta3`` are added + to an existing view with snapshot enabled and disabled, respectively. + """ + parts = [ + "CREATE TABLE t1(id INT NOT NULL, n BIGINT NOT NULL, s VARCHAR NOT NULL)\n" + " WITH ('materialized' = 'true');", + ] + for (label, view, index, indexes_sql), loc in zip(_VIEWS, locations): + connectors = [ + _delta_connector(loc, send_snapshot=True, index=index), + ] + if label == "v_plain": + connectors.extend( + [ + _delta_connector( + delta2_location, + send_snapshot=True, + name="delta2", + ), + _delta_connector( + delta3_location, + send_snapshot=False, + name="delta3", + ), + ] + ) + parts.append( + f"CREATE MATERIALIZED VIEW {view} WITH (\n" + f" 'connectors' = '{json.dumps(connectors)}'\n" + f") AS SELECT * FROM t1;" + ) + if indexes_sql: + parts.append(indexes_sql) + + delta1 = _delta_connector(delta1_location, send_snapshot=False, name="delta1") + parts.append( + "CREATE MATERIALIZED VIEW v_added WITH (\n" + f" 'connectors' = '{json.dumps([delta1])}'\n" + ") AS SELECT * FROM t1;" + ) + return "\n\n".join(parts) + + @skip_on_arm64 # https://github.com/delta-io/delta-rs/issues/4413 def test_delta_output_send_snapshot_after_flag_flip(): """Verify snapshot delivery to delta sinks across a connector @@ -215,6 +268,13 @@ def test_delta_output_send_snapshot_after_flag_flip(): proves the initial snapshot delivered the full materialized-view contents — for every combination of indexes and which index the connector reads through. + + Round 3 adds a new view with a new Delta connector (``delta1``) and + adds two Delta connectors to an existing view: ``delta2`` with + ``send_snapshot=true`` and ``delta3`` with ``send_snapshot=false``. + Adding the view forces actual bootstrapping. ``delta1`` and + ``delta2`` should be populated during startup, while ``delta3`` + should remain empty until future deltas arrive. """ pipeline_name = unique_pipeline_name( "test_delta_output_send_snapshot_after_flag_flip" @@ -222,6 +282,9 @@ def test_delta_output_send_snapshot_after_flag_flip(): locations = [ DeltaTestLocation.create(f"{pipeline_name}_{label}") for label, *_ in _VIEWS ] + delta1_location = DeltaTestLocation.create(f"{pipeline_name}_delta1") + delta2_location = DeltaTestLocation.create(f"{pipeline_name}_delta2") + delta3_location = DeltaTestLocation.create(f"{pipeline_name}_delta3") expected = sorted_rows(_ROWS) # Round 1: send_snapshot=false. Push three rows; they land via the @@ -272,3 +335,40 @@ def test_delta_output_send_snapshot_after_flag_flip(): for (_, *_), loc in zip(_VIEWS, locations): assert sorted_rows(loc.read_rows()) == expected + + pipeline.checkpoint(wait=True) + pipeline.stop(force=True) + + # Round 3: add a new view and new output connectors to an existing view. + # The new view forces bootstrapping. The connector on the new view and + # the existing-view connector with send_snapshot=true are populated during + # startup; the existing-view connector with send_snapshot=false is not. + TEST_CLIENT.patch_pipeline( + name=pipeline_name, + sql=_build_sql_round3( + locations, + delta1_location, + delta2_location, + delta3_location, + ), + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + storage=Storage(config={"start_from_checkpoint": "latest"}), + ).to_dict(), + ) + pipeline = Pipeline.get(pipeline_name, TEST_CLIENT) + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) + + wait_for_condition( + "round 3: delta1 on new bootstrapped view receives rows", + lambda: sorted_rows(delta1_location.read_rows()) == expected, + timeout_s=60.0, + poll_interval_s=1.0, + ) + wait_for_condition( + "round 3: delta2 on existing view receives snapshot rows", + lambda: sorted_rows(delta2_location.read_rows()) == expected, + timeout_s=60.0, + poll_interval_s=1.0, + ) + assert delta3_location.read_rows() == []