diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 879ef118a94..b1c75e39ba1 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -2395,6 +2395,9 @@ struct CircuitThread { ft: Option, parker: Parker, + /// Whether to suppress output connector records during bootstrapping. + silent_bootstrap: bool, + checkpoint_delay_warning: Option, checkpoint_requests: Vec, running_checkpoint: Option, @@ -2428,6 +2431,10 @@ struct CircuitThread { input_metadata: HashMap>, commit_updates: Option, + + /// Set to true on startup if the circuit requires bootstrapping. + /// Cleared when the circuit completes bootstrapping. + bootstrapping: bool, } struct CommitUpdates { @@ -2594,9 +2601,10 @@ impl CircuitThread { if !diff.is_empty() { info!("Pipeline changes detected: {diff}"); - if state.bootstrap_policy() == BootstrapPolicy::Reject { + let bootstrap_policy = state.bootstrap_config().bootstrap_policy; + if bootstrap_policy == BootstrapPolicy::Reject { return Err(ControllerError::BootstrapRejectedByUser); - } else if state.bootstrap_policy() == BootstrapPolicy::AwaitApproval { + } else if bootstrap_policy == BootstrapPolicy::AwaitApproval { info!("Awaiting user approval before bootstrapping modified pipeline."); state.set_phase(PipelinePhase::Initializing( InitializationState::AwaitingApproval(Box::new(diff.clone())), @@ -2604,7 +2612,7 @@ impl CircuitThread { } loop { - match state.bootstrap_policy() { + match state.bootstrap_config().bootstrap_policy { BootstrapPolicy::Allow => { info!( "User approved pipeline changes. Proceeding with initialization." @@ -2694,9 +2702,9 @@ impl CircuitThread { incarnation_uuid, )?; - controller - .status - .set_bootstrap_in_progress(circuit.bootstrap_in_progress()); + let bootstrapping = circuit.bootstrap_in_progress(); + + controller.status.set_bootstrap_in_progress(bootstrapping); let input_metadata = input_metadata.map(|input_metadata| { input_metadata @@ -2712,7 +2720,7 @@ impl CircuitThread { // The pipeline hasn't changed based on input and output persistent id values, // yet the circuit is bootstrapping. This is a bug. - if can_replay && circuit.bootstrap_in_progress() { + if can_replay && bootstrapping { return Err(ControllerError::UnexpectedBootstrap { bootstrap_info: circuit.bootstrap_info().clone(), }); @@ -2733,10 +2741,10 @@ impl CircuitThread { }?; // The above code ensures that replay and bootstrapping cannot happen at the same time. - assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress())); + assert!(!(ft.is_replaying() && bootstrapping)); // Disable journaling while we're bootstrapping the circuit. - if circuit.bootstrap_in_progress() { + if bootstrapping { ft.disable(); } @@ -2754,6 +2762,9 @@ impl CircuitThread { backpressure_thread, storage, parker, + silent_bootstrap: state + .map(|state| state.bootstrap_config().silent_bootstrap) + .unwrap_or(false), checkpoint_delay_warning: None, checkpoint_requests: Vec::new(), running_checkpoint: None, @@ -2764,6 +2775,7 @@ impl CircuitThread { checkpoint_sender, input_metadata: input_metadata.unwrap_or_default(), commit_updates: None, + bootstrapping, }) } @@ -2792,10 +2804,7 @@ impl CircuitThread { // so that if the first step() we perform below before entering the loop // ends up finishing bootstrapping, we will still perform an extra step to initialize // the output table snapshots inside the loop. - let mut trigger = StepTrigger::new( - self.controller.clone(), - self.circuit.bootstrap_in_progress(), - ); + let mut trigger = StepTrigger::new(self.controller.clone()); if config.global.cpu_profiler { self.circuit.enable_cpu_profiler().unwrap_or_else(|e| { error!("Failed to enable CPU profiler: {e}"); @@ -2810,7 +2819,7 @@ impl CircuitThread { // // Skip this during bootstrap to avoid a slow first step. We don't guarantee // that view snapshots are up-to-date until bootstrap is complete. - if !self.circuit.bootstrap_in_progress() + if !self.bootstrapping && let Err(error) = self.step() { let _ = init_status_sender.send(Err(error)); @@ -2884,7 +2893,11 @@ impl CircuitThread { self.last_checkpoint(), self.last_checkpoint_sync(), self.replaying(), - self.circuit.bootstrap_in_progress(), + // `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete, + // which is required to initialize the output snapshots. + // We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress` + // rather than `self.bootstrapping` here. + self.controller.status.bootstrap_in_progress(), self.checkpoint_requested(), self.sync_checkpoint_requested(), coordination_request, @@ -2957,11 +2970,6 @@ impl CircuitThread { transaction_state.into_coordination_status(), )); - // If bootstrapping has completed, update the status flag. - self.controller - .status - .set_bootstrap_in_progress(self.circuit.bootstrap_in_progress()); - // Update `trace_snapshot` to the latest traces. // // We do this before updating `total_processed_records` so that ad hoc @@ -2973,6 +2981,20 @@ impl CircuitThread { .with_category("Step") .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) .in_scope(|| self.update_snapshot()); + + let bootstrapping = self.circuit.bootstrap_in_progress(); + + // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag + // until the circuit performs an extra transaction to initialize output snapshots + // (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()` + // is true). + if self.bootstrapping && !bootstrapping { + self.bootstrapping = false; + } else { + self.controller + .status + .set_bootstrap_in_progress(bootstrapping); + } } // Record that we've processed the records, unless there is a transaction in progress, @@ -3700,16 +3722,36 @@ impl CircuitThread { /// this step. If `processed_records` is `None`, we're in the middle of a /// transaction and the records are not fully processed yet. fn push_output(&mut self, processed_records: Option) { + let silent_bootstrap = + self.silent_bootstrap && self.controller.status.bootstrap_in_progress(); + let outputs = self.controller.outputs.read().unwrap(); for (_stream, (output_handles, endpoints)) in outputs.iter_by_stream() { - let delta_batch = output_handles.delta_handle.as_ref().concat(); - let num_delta_records = delta_batch.len(); + let (mut delta_batch, num_delta_records) = if silent_bootstrap { + let _ = output_handles.delta_handle.take_from_all(); + (None, 0) + } else { + let delta_batch = output_handles.delta_handle.as_ref().concat(); + let num_delta_records = delta_batch.len(); - let mut delta_batch = Some(delta_batch); + (Some(delta_batch), num_delta_records) + }; for (i, endpoint_id) in endpoints.iter().enumerate() { let endpoint = outputs.lookup_by_id(endpoint_id).unwrap(); + // Silent bootstrap: send empty batch for progress tracking only. + if silent_bootstrap { + self.controller.status.enqueue_batch(*endpoint_id, 0); + endpoint.queue.push(BatchQueueEntry { + step: self.step, + data: None, + processed_records, + }); + endpoint.unparker.unpark(); + continue; + } + if endpoint.created_during_transaction_number == self.controller.get_transaction_number() { @@ -4143,10 +4185,6 @@ struct StepTrigger { /// Time between automatic checkpoint syncs. sync_interval: Option, - - /// The circuit is bootstrapping. Used to detect the transition from bootstrapping - /// to normal mode. - bootstrapping: bool, } /// Action for the controller to take. @@ -4167,7 +4205,7 @@ enum Action { impl StepTrigger { /// Returns a new [StepTrigger]. - fn new(controller: Arc, bootstrapping: bool) -> Self { + fn new(controller: Arc) -> Self { let config = &controller.status.pipeline_config.global; let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs); let min_batch_size_records = config.min_batch_size_records; @@ -4187,7 +4225,6 @@ impl StepTrigger { max_buffering_delay, min_batch_size_records, checkpoint_interval, - bootstrapping, sync_interval, } } @@ -4249,16 +4286,7 @@ impl StepTrigger { } _ => Some(Action::Park(None)), } - } else if replaying - || self.controller.transaction_commit_requested() - || bootstrapping - || self.bootstrapping - { - // The `self.bootstrapping` condition above detects a transition - // from bootstrapping to normal operation and makes sure that the - // circuit performs an extra step in the normal mode in order to - // initialize output table snapshots of output relations that did - // not participate in bootstrapping. + } else if replaying || self.controller.transaction_commit_requested() || bootstrapping { Some(Action::Step) } else if timer_expired(next_checkpoint, now) && !checkpoint_requested { Some(Action::Checkpoint) @@ -4325,7 +4353,6 @@ impl StepTrigger { } }; - self.bootstrapping = bootstrapping; if result == Action::Step { self.buffer_timeout = None; } diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index ac61a49c5c9..885f364ec58 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -36,7 +36,6 @@ use actix_web::{ }; use arrow::ipc::writer::StreamWriter; use async_stream; -use atomic::Atomic; use bytes::Bytes; use chrono::Utc; use clap::Parser; @@ -69,11 +68,12 @@ use feldera_types::coordination::{ use feldera_types::format::json::JsonEncoderConfig; use feldera_types::pipeline_diff::PipelineDiff; use feldera_types::query_params::{ - ActivateParams, MetricsFormat, MetricsParameters, SamplyProfileGetParams, SamplyProfileParams, + ActivateParams, ApproveParameters, MetricsFormat, MetricsParameters, SamplyProfileGetParams, + SamplyProfileParams, }; use feldera_types::runtime_status::{ - BootstrapPolicy, ExtendedRuntimeStatus, ExtendedRuntimeStatusError, RuntimeDesiredStatus, - RuntimeStatus, StorageStatusDetails, + BootstrapConfig, BootstrapPolicy, ExtendedRuntimeStatus, ExtendedRuntimeStatusError, + RuntimeDesiredStatus, RuntimeStatus, StorageStatusDetails, }; use feldera_types::suspend::{SuspendError, SuspendableResponse}; use feldera_types::time_series::TimeSeries; @@ -247,8 +247,8 @@ pub(crate) struct ServerState { /// The other locks in this structure nest inside `desired_status`. desired_status: Mutex, - /// Bootstrap policy. - bootstrap_policy: Atomic, + /// Bootstrap configuration. + bootstrap_config: Mutex, /// Notified when `desired_status` or `phase` changes. desired_status_change: Arc, @@ -320,7 +320,7 @@ impl ServerState { phase: PipelinePhase, md: String, desired_status: RuntimeDesiredStatus, - bootstrap_policy: BootstrapPolicy, + bootstrap_config: BootstrapConfig, deployment_id: Uuid, storage: Option>, ) -> Self { @@ -334,7 +334,7 @@ impl ServerState { checkpoint_state: Default::default(), sync_checkpoint_state: Default::default(), desired_status: Mutex::new(desired_status), - bootstrap_policy: Atomic::new(bootstrap_policy), + bootstrap_config: Mutex::new(bootstrap_config), deployment_id, storage, rate_limiter, @@ -350,7 +350,7 @@ impl ServerState { PipelinePhase::InitializationError(Arc::new(error)), String::default(), RuntimeDesiredStatus::Paused, - BootstrapPolicy::Allow, + BootstrapConfig::default(), deployment_id, None, ) @@ -415,12 +415,12 @@ impl ServerState { *self.desired_status.lock().unwrap() } - pub fn bootstrap_policy(&self) -> BootstrapPolicy { - self.bootstrap_policy.load(Ordering::Acquire) + pub fn bootstrap_config(&self) -> BootstrapConfig { + *self.bootstrap_config.lock().unwrap() } - fn set_bootstrap_policy(&self, policy: BootstrapPolicy) { - self.bootstrap_policy.store(policy, Ordering::Release) + fn set_bootstrap_config(&self, config: BootstrapConfig) { + *self.bootstrap_config.lock().unwrap() = config; } fn phase(&self) -> PipelinePhase { @@ -485,6 +485,10 @@ pub struct ServerArgs { #[arg(long, default_value_t = BootstrapPolicy::Allow)] pub bootstrap_policy: BootstrapPolicy, + /// Bootstrap the pipeline with output connectors disabled. + #[arg(long, action = clap::ArgAction::SetTrue)] + pub silent_bootstrap: bool, + /// UUID generated by the runner for the compute resources that were provisioned to keep this /// pipeline process running. It will thus only change if the pipeline is stopped and started /// again. If stored in persistent storage, it can be used to determine upon initialization @@ -662,7 +666,7 @@ pub fn run_server( args.deployment_id ); info!("Desired status from arguments: {:?}", args.initial); - let (initial_status, bootstrap_policy) = match builder + let (initial_status, bootstrap_config) = match builder .storage() .and_then(|storage| StoredStatus::read(&*storage)) .inspect(|stored| { @@ -674,12 +678,20 @@ pub fn run_server( }) { _ if args.initial == RuntimeDesiredStatus::Coordination => { // Always defer to the coordinator if there is one. - (args.initial, args.bootstrap_policy) + ( + args.initial, + BootstrapConfig::from(args.bootstrap_policy) + .with_silent_bootstrap(args.silent_bootstrap), + ) } Some(stored) if stored.deployment_id == args.deployment_id => { // This is an automatic restart (otherwise the deployment ID would // have changed). Use the stored desired status. - (stored.desired_status, stored.bootstrap_policy) + ( + stored.desired_status, + BootstrapConfig::from(stored.bootstrap_policy) + .with_silent_bootstrap(stored.silent_bootstrap), + ) } Some(stored) => { // The pipeline manager restarted us. Use the desired status it @@ -693,13 +705,21 @@ pub fn run_server( to: args.initial, }); } - (args.initial, args.bootstrap_policy) + ( + args.initial, + BootstrapConfig::from(args.bootstrap_policy) + .with_silent_bootstrap(args.silent_bootstrap), + ) } None => { // Initial deployment of this pipeline. Use the desired status // passed in by the pipeline manager (it's the only one we have, // anyway). - (args.initial, args.bootstrap_policy) + ( + args.initial, + BootstrapConfig::from(args.bootstrap_policy) + .with_silent_bootstrap(args.silent_bootstrap), + ) } }; @@ -736,7 +756,7 @@ pub fn run_server( PipelinePhase::Initializing(InitializationState::Starting), md, initial_status, - bootstrap_policy, + bootstrap_config, args.deployment_id, builder.storage().clone(), )); @@ -760,10 +780,11 @@ pub fn run_server( loop { let notify = state.desired_status_change.notified(); let desired_status = state.desired_status(); - let bootstrap_policy = state.bootstrap_policy(); + let bootstrap_config = state.bootstrap_config(); let stored_status = StoredStatus { desired_status, - bootstrap_policy, + bootstrap_policy: bootstrap_config.bootstrap_policy, + silent_bootstrap: bootstrap_config.silent_bootstrap, deployment_id: state.deployment_id, }; if Some(stored_status) != prev_stored_status { @@ -1327,8 +1348,13 @@ async fn activate( } #[post("/approve")] -async fn approve(state: WebData) -> Result { - state.set_bootstrap_policy(BootstrapPolicy::Allow); +async fn approve( + state: WebData, + args: Query, +) -> Result { + state.set_bootstrap_config( + BootstrapConfig::from(BootstrapPolicy::Allow).with_silent_bootstrap(args.silent_bootstrap), + ); // Make sure we don't return until the pipeline has moved on to the next phase // (InitializationStatus::Starting). This makes the API synchronous, so the user can @@ -2811,12 +2837,21 @@ struct StoredStatus { /// Desired status. desired_status: RuntimeDesiredStatus, + /// Bootstrap policy. bootstrap_policy: BootstrapPolicy, + /// Bootstrap the pipeline with output connectors disabled. + #[serde(default, skip_serializing_if = "is_false")] + silent_bootstrap: bool, + // Deployment ID. deployment_id: Uuid, } +fn is_false(value: &bool) -> bool { + !value +} + impl StoredStatus { fn read(storage: &dyn StorageBackend) -> Option { match storage.read_json::(&StoragePath::from(STATUS_FILE)) { @@ -2856,11 +2891,14 @@ mod test_with_kafka { }; use actix_test::TestServer; use actix_web::{App, http::StatusCode, middleware::Logger, web::Data as WebData}; - use feldera_types::runtime_status::RuntimeDesiredStatus; + use csv::ReaderBuilder as CsvReaderBuilder; + use feldera_types::runtime_status::{ + BootstrapConfig, ExtendedRuntimeStatus, RuntimeDesiredStatus, + }; use feldera_types::{ adapter_stats::{ExternalControllerStatus, TransactionStatus}, completion_token::{CompletionStatus, CompletionStatusResponse, CompletionTokenResponse}, - runtime_status::BootstrapPolicy, + runtime_status::{BootstrapPolicy, RuntimeStatus}, }; use proptest::{ strategy::{Strategy, ValueTree}, @@ -2868,12 +2906,14 @@ mod test_with_kafka { }; use serde_json::{self, Value as JsonValue, json}; use std::{ + fs::File, io::Write, + path::Path, thread, thread::sleep, time::{Duration, Instant}, }; - use tempfile::NamedTempFile; + use tempfile::{NamedTempFile, TempDir}; use uuid::Uuid; async fn print_stats(server: &TestServer) { @@ -2892,7 +2932,12 @@ mod test_with_kafka { println!("{stats}") } - async fn start_test_server(config_str: &str, deployment_id: Uuid) -> TestServer { + async fn start_test_server_with_options( + config_str: &str, + deployment_id: Uuid, + bootstrap_config: BootstrapConfig, + persistent_output_ids: &'static [Option<&'static str>], + ) -> TestServer { let mut config_file = NamedTempFile::new().unwrap(); config_file.write_all(config_str.as_bytes()).unwrap(); @@ -2902,7 +2947,7 @@ mod test_with_kafka { PipelinePhase::Initializing(InitializationState::Starting), String::default(), RuntimeDesiredStatus::Paused, - BootstrapPolicy::Allow, + bootstrap_config, deployment_id, None, )); @@ -2918,7 +2963,8 @@ mod test_with_kafka { https_tls_cert_path: None, https_tls_key_path: None, initial: RuntimeDesiredStatus::Paused, - bootstrap_policy: BootstrapPolicy::Allow, + bootstrap_policy: bootstrap_config.bootstrap_policy, + silent_bootstrap: bootstrap_config.silent_bootstrap, deployment_id, host_id: None, }; @@ -2932,7 +2978,7 @@ mod test_with_kafka { Ok(test_circuit::( workers, &TestStruct::schema(), - &[None], + persistent_output_ids, )) }), state_clone, @@ -2952,6 +2998,99 @@ mod test_with_kafka { server } + async fn start_test_server(config_str: &str, deployment_id: Uuid) -> TestServer { + start_test_server_with_options( + config_str, + deployment_id, + BootstrapConfig::from(BootstrapPolicy::Allow), + &[None], + ) + .await + } + + fn test_batches(start: u32, len: u32) -> Vec> { + vec![(start..start + len).map(TestStruct::for_id).collect()] + } + + fn read_file_output(output_path: &Path) -> Vec { + if !output_path.exists() { + return Vec::new(); + } + + let mut actual = CsvReaderBuilder::new() + .has_headers(false) + .from_reader(File::open(output_path).unwrap()) + .deserialize::<(TestStruct, i32)>() + .map(|res| { + let (record, weight) = res.unwrap(); + assert_eq!(weight, 1); + record + }) + .collect::>(); + actual.sort(); + actual + } + + async fn wait_for_status(server: &TestServer, expected: RuntimeStatus) { + async_wait( + || async { + server + .get("/status") + .send() + .await + .unwrap() + .json::() + .await + .unwrap() + .runtime_status + == expected + }, + 20_000, + ) + .await + .unwrap(); + } + + async fn start_pipeline(server: &TestServer) { + println!("/start"); + let resp = server.get("/start").send().await.unwrap(); + assert!(resp.status().is_success()); + wait_for_status(server, RuntimeStatus::Running).await; + } + + async fn suspend_pipeline(server: &TestServer) { + println!("/suspend"); + let resp = server.post("/suspend").send().await.unwrap(); + assert!(resp.status().is_success()); + wait_for_status(server, RuntimeStatus::Suspended).await; + } + + async fn send_input(server: &TestServer, data: &[Vec]) { + let req = server.post("/ingress/test_input1"); + TestHttpSender::send_stream(req, data).await; + } + + async fn wait_for_file_output(output_path: &Path, expected: &[Vec]) { + let mut expected = expected + .iter() + .flat_map(|batch| batch.iter()) + .cloned() + .collect::>(); + expected.sort(); + + async_wait( + || std::future::ready(read_file_output(output_path) == expected), + 20_000, + ) + .await + .unwrap(); + } + + async fn assert_no_file_output(output_path: &Path) { + tokio::time::sleep(Duration::from_millis(2_000)).await; + assert_eq!(read_file_output(output_path), Vec::::new()); + } + #[actix_web::test] async fn test_server() { ensure_default_crypto_provider(); @@ -3013,9 +3152,7 @@ outputs: assert!(buffer_consumer.is_empty()); // Start pipeline. - println!("/start"); - let resp = server.get("/start").send().await.unwrap(); - assert!(resp.status().is_success()); + start_pipeline(&server).await; sleep(Duration::from_millis(3000)); @@ -3061,9 +3198,7 @@ outputs: assert_eq!(buffer_consumer.len(), 0); // Start pipeline; still no data because the endpoint is paused. - println!("/start"); - let resp = server.get("/start").send().await.unwrap(); - assert!(resp.status().is_success()); + start_pipeline(&server).await; sleep(Duration::from_millis(2000)); assert_eq!(buffer_consumer.len(), 0); @@ -3137,9 +3272,7 @@ outputs: .unwrap(); println!("Streaming test"); - let req = server.post("/ingress/test_input1"); - - TestHttpSender::send_stream(req, &data).await; + send_input(&server, &data).await; println!("data sent"); buffer_consumer.wait_for_output_unordered(&data); @@ -3207,9 +3340,7 @@ outputs: buffer_consumer.clear(); - println!("/start"); - let resp = server.get("/start").send().await.unwrap(); - assert!(resp.status().is_success()); + start_pipeline(&server).await; sleep(Duration::from_millis(5000)); @@ -3222,6 +3353,84 @@ outputs: drop(kafka_resources); } + #[actix_web::test] + async fn test_silent_bootstrap() { + ensure_default_crypto_provider(); + + let tempdir = TempDir::new().unwrap(); + let storage_dir = tempdir.path().join("storage"); + let output_path = tempdir.path().join("output.csv"); + std::fs::create_dir(&storage_dir).unwrap(); + + let config_str = format!( + r#" +name: test +workers: 4 +storage_config: + path: "{}" +storage: true +clock_resolution_usecs: +inputs: +outputs: + test_output1: + stream: test_output1 + transport: + name: file_output + config: + path: "{}" + format: + name: csv + config: {{}} +"#, + storage_dir.display(), + output_path.display() + ); + + let first_batch = test_batches(0, 10); + let second_batch = test_batches(10, 10); + let third_batch = test_batches(20, 10); + + let server = start_test_server_with_options( + &config_str, + Uuid::new_v4(), + BootstrapConfig::from(BootstrapPolicy::Allow), + &[Some("v0")], + ) + .await; + start_pipeline(&server).await; + send_input(&server, &first_batch).await; + wait_for_file_output(&output_path, &first_batch).await; + suspend_pipeline(&server).await; + drop(server); + + let server = start_test_server_with_options( + &config_str, + Uuid::new_v4(), + BootstrapConfig::from(BootstrapPolicy::Allow).with_silent_bootstrap(true), + &[Some("v1")], + ) + .await; + assert_no_file_output(&output_path).await; + start_pipeline(&server).await; + send_input(&server, &second_batch).await; + wait_for_file_output(&output_path, &second_batch).await; + suspend_pipeline(&server).await; + drop(server); + + let server = start_test_server_with_options( + &config_str, + Uuid::new_v4(), + BootstrapConfig::from(BootstrapPolicy::Allow).with_silent_bootstrap(true), + &[Some("v2")], + ) + .await; + assert_no_file_output(&output_path).await; + start_pipeline(&server).await; + send_input(&server, &third_batch).await; + wait_for_file_output(&output_path, &third_batch).await; + suspend_pipeline(&server).await; + } + #[actix_web::test] async fn test_transactions() { ensure_default_crypto_provider(); @@ -3243,9 +3452,7 @@ outputs: let server = start_test_server(config_str, Uuid::default()).await; // Start pipeline. - println!("/start"); - let resp = server.get("/start").send().await.unwrap(); - assert!(resp.status().is_success()); + start_pipeline(&server).await; println!("/start_transaction"); let resp = server.post("/start_transaction").send().await.unwrap(); @@ -3258,9 +3465,7 @@ outputs: .await .unwrap(); - let req = server.post("/ingress/test_input1"); - - TestHttpSender::send_stream(req, &data).await; + send_input(&server, &data).await; println!("data sent"); println!("/commit_transaction"); @@ -3295,9 +3500,7 @@ outputs: "#; let server = start_test_server(config_str, Uuid::default()).await; - println!("/start"); - let resp = server.get("/start").send().await.unwrap(); - assert!(resp.status().is_success()); + start_pipeline(&server).await; println!("/start_transaction"); let resp = server.post("/start_transaction").send().await.unwrap(); diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index 619bb2d0a94..97a67cbd18e 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream( // We currently only support using operators in the top-level circuit // as replay sources. if TypeId::of::<()>() == TypeId::of::() { - circuit.cache_insert( - ReplaySource::new(stream.stream_id()), - Box::new(replay_stream.clone()), - ); + // If a replay source already exists, don't overwrite it. This normally shouldn't + // happen as we should not have more than one integral for each stream. One situation + // where this does happen today is for input streams that have an integral without + // an accumulator as part of input_upsert, and another integral with an accumulator + // created by a downstream join or aggregate. In this case, we want to use the former + // for replay, as the latter may have been added in the new version of the program + // and may be empty, while the former can have state (conversely, if the input integral + // is empty, the downstream integral is guaranteed to be empty too). + if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) { + circuit.cache_insert( + ReplaySource::new(stream.stream_id()), + Box::new(replay_stream.clone()), + ); + } } } @@ -7597,10 +7607,15 @@ impl CircuitHandle { return true; }; - replay_info.replay_sources.keys().all(|node_id| { + // Bootstrapping is finished when all replay sources have completed their replay and the + // transaction has been committed. + + let all_complete = replay_info.replay_sources.keys().all(|node_id| { self.circuit .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete()) - }) + }); + + all_complete && self.is_commit_complete() } /// Finalize the replay phase of the circuit. diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 42a277fb32e..2ae439dfd82 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -1397,20 +1397,6 @@ impl DBSPHandle { Ok(progress) } - pub fn set_replay_step_size(&mut self, step_size: usize) { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().set_replay_step_size(step_size); - } - } - - pub fn get_replay_step_size(&self) -> usize { - if let Some(handle) = self.runtime.as_ref() { - handle.runtime().get_replay_step_size() - } else { - 0 - } - } - /// The circuit has been resumed from a checkpoint and is currently bootstrapping the modified part of the circuit. pub fn bootstrap_in_progress(&self) -> bool { self.bootstrap_info.is_some() diff --git a/crates/dbsp/src/circuit/replay_tests.rs b/crates/dbsp/src/circuit/replay_tests.rs index 37cd54f995b..a230329c476 100644 --- a/crates/dbsp/src/circuit/replay_tests.rs +++ b/crates/dbsp/src/circuit/replay_tests.rs @@ -1,9 +1,10 @@ use feldera_types::config::StorageConfig; use crate::{ - CmpFunc, DBData, OrdZSet, OutputHandle, RootCircuit, Runtime, Stream, ZSetHandle, ZWeight, + CmpFunc, DBData, IndexedZSetHandle, OrdIndexedZSet, OrdZSet, OutputHandle, RootCircuit, + Runtime, Stream, ZSetHandle, ZWeight, circuit::dbsp_handle::CircuitStorageConfig, - default_hash, + default_hash, indexed_zset, operator::{ Max, Min, time_series::{RelOffset, RelRange}, @@ -148,52 +149,30 @@ type CircuitFn = Arc< ///``` /// /// The common part of the two circuits must return identical results. -fn test_replay( - circuit_constructor1: CircuitFn, - circuit_constructor2: CircuitFn, - inputs1: Vec, - inputs2_1: Vec, - inputs2_2: Vec, - inputs3: Vec, -) where - I1: TestDataType, - I2: TestDataType, - I3: TestDataType, - O1: TestDataType, - O2: TestDataType, - O3: TestDataType, -{ - // Run with replay step size < splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1.clone(), - circuit_constructor2.clone(), - inputs1.clone(), - inputs2_1.clone(), - inputs2_2.clone(), - inputs3.clone(), - Some(1), - ); - // Run without replay step size > splitter chunk size. - test_replay_with_step_size::( - circuit_constructor1, - circuit_constructor2, - inputs1, - inputs2_1, - inputs2_2, - inputs3, - None, - ); +fn circuit_config(path: &PathBuf) -> CircuitConfig { + CircuitConfig::with_workers(NUM_WORKERS) + .with_splitter_chunk_size_records(2) + .with_mode(Mode::Persistent) + .with_storage(Some( + CircuitStorageConfig::for_config( + StorageConfig { + path: path.to_string_lossy().into_owned(), + cache: Default::default(), + }, + Default::default(), + ) + .unwrap(), + )) } -fn test_replay_with_step_size( +fn test_replay( circuit_constructor1: CircuitFn, circuit_constructor2: CircuitFn, inputs1: Vec, inputs2_1: Vec, inputs2_2: Vec, inputs3: Vec, - replay_step_size: Option, ) where I1: TestDataType, I2: TestDataType, @@ -210,22 +189,6 @@ fn test_replay_with_step_size( let path = tempfile::tempdir().unwrap().keep(); println!("Running replay_test in {}", path.display()); - fn circuit_config(path: &PathBuf) -> CircuitConfig { - CircuitConfig::with_workers(NUM_WORKERS) - .with_splitter_chunk_size_records(2) - .with_mode(Mode::Persistent) - .with_storage(Some( - CircuitStorageConfig::for_config( - StorageConfig { - path: path.to_string_lossy().into_owned(), - cache: Default::default(), - }, - Default::default(), - ) - .unwrap(), - )) - } - // Create both reference circuits, feed I1 and I2 to circuit1; feed I2 and I3 to circuit2. let mut reference_output1 = Vec::new(); let mut reference_output2 = Vec::new(); @@ -343,10 +306,6 @@ fn test_replay_with_step_size( }) .unwrap(); - if let Some(replay_step_size) = replay_step_size { - circuit.set_replay_step_size(replay_step_size); - } - while circuit.bootstrap_in_progress() { circuit.transaction().unwrap(); } @@ -1690,3 +1649,252 @@ fn test_rolling_circuit() { std::iter::repeat_n((), 20).collect(), ); } + +// Regression test: +// +// Pipeline 1: +// ---> input_map +// +// Pipeline 2: +// ---> input_map ---> aggregate --> output +// +// The second pipeline should replay the input from the input_map operator. +// A bug prevented this from happening, because the integral built by the +// aggregate operator was used to replay instead. + +#[test] +fn regression1() { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + + let (mut circuit1, input_handle1) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + Ok(input_handle) + }) + .unwrap(); + + input_handle1.push(0, crate::operator::Update::Insert(0)); + + circuit1.transaction().unwrap(); + + // Checkpoint. + let checkpoint = circuit1.checkpoint().run().unwrap(); + circuit1.kill().unwrap(); + + // Restart the second circuit from the checkpoint. + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit2, (_input_handle2, output_handle2)) = + Runtime::init_circuit(circuit_config, move |circuit| { + let (input_stream, input_handle) = circuit + .add_input_map_persistent::(Some("input_map"), |v, u| *v = *u); + input_stream.set_persistent_id(Some("input_map")); + + let aggregate = input_stream + .aggregate_persistent(Some("aggregate1"), Max) + .set_persistent_id(Some("aggregate1")); + + let output_handle = aggregate + .accumulate_trace() + .apply(|trace| trace.ro_snapshot().consolidate()) + .output_persistent(Some("output")); + Ok((input_handle, output_handle)) + }) + .unwrap(); + + while circuit2.bootstrap_in_progress() { + circuit2.transaction().unwrap(); + } + println!("Replay finished"); + + let actual_output = &output_handle2.concat().consolidate(); + + // The bug causes the output to be empty. + assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1})); +} + +/// Unit test for the replay behavior of Z1Trace and AccumulateZ1Trace operators. +/// Operators must correctly replay their contents during bootstrap as one atomic transaction. + +#[derive(Clone, Copy, Debug)] +enum ReplayTraceKind { + IntegrateTrace, + AccumulateTrace, +} + +type IndexedReplayBatch = Vec>>; + +fn add_replay_trace( + stream: &Stream>, + trace_kind: ReplayTraceKind, +) { + match trace_kind { + ReplayTraceKind::IntegrateTrace => { + stream.integrate_trace(); + } + ReplayTraceKind::AccumulateTrace => { + stream.accumulate_trace(); + } + } +} + +fn transactional_bootstrap_circuit1( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> IndexedZSetHandle { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + input_handle +} + +fn transactional_bootstrap_circuit2( + circuit: &mut RootCircuit, + trace_kind: ReplayTraceKind, +) -> ( + IndexedZSetHandle, + OutputHandle>>, +) { + let (input_stream, input_handle) = circuit.add_input_indexed_zset::(); + input_stream.set_persistent_id(Some("input")); + add_replay_trace(&input_stream, trace_kind); + + let output_handle = input_stream.accumulate_output_persistent(Some("output")); + + (input_handle, output_handle) +} + +fn replay_batch_to_indexed_zset(batches: &[IndexedReplayBatch]) -> OrdIndexedZSet { + OrdIndexedZSet::from_tuples( + (), + batches + .iter() + .flatten() + .map(|Tup2(key, Tup2(value, weight))| Tup2(Tup2(*key, *value), *weight)) + .collect(), + ) +} + +fn run_transactional_bootstrap_test( + trace_kind: ReplayTraceKind, + batches: Vec, + expect_multistep_replay: bool, +) { + init_test_logger(); + + let path = tempfile::tempdir().unwrap().keep(); + let expected = replay_batch_to_indexed_zset(&batches); + + let checkpoint = { + let (mut circuit, input_handle) = + Runtime::init_circuit(circuit_config(&path), move |circuit| { + Ok(transactional_bootstrap_circuit1(circuit, trace_kind)) + }) + .unwrap(); + + for mut batch in batches.clone() { + input_handle.append(&mut batch); + circuit.transaction().unwrap(); + } + + let checkpoint = circuit.checkpoint().run().unwrap(); + circuit.kill().unwrap(); + checkpoint + }; + + let mut circuit_config = circuit_config(&path); + circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid); + + let (mut circuit, (_input_handle, output_handle)) = + Runtime::init_circuit(circuit_config, move |circuit| { + Ok(transactional_bootstrap_circuit2(circuit, trace_kind)) + }) + .unwrap(); + + assert_eq!(output_handle.num_nonempty_mailboxes(), 0); + + if circuit.bootstrap_in_progress() { + circuit.start_transaction().unwrap(); + circuit.start_commit_transaction().unwrap(); + + let mut incomplete_commit_steps = 0; + loop { + let commit_complete = circuit.step().unwrap(); + if commit_complete { + break; + } + + incomplete_commit_steps += 1; + } + + if expect_multistep_replay { + assert!( + incomplete_commit_steps > 0, + "{trace_kind:?} replay finished in a single commit step despite the splitter chunk size" + ); + } + } + + assert!(!circuit.bootstrap_in_progress()); + assert_eq!(output_handle.concat().consolidate(), expected); + + circuit.kill().unwrap(); +} + +fn transactional_bootstrap_cases() -> Vec<(Vec, bool)> { + vec![ + (vec![], false), + (vec![vec![Tup2(1, Tup2(10, 1))]], false), + ( + vec![vec![Tup2(1, Tup2(10, 1)), Tup2(1, Tup2(11, 1))]], + false, + ), + ( + vec![ + vec![ + Tup2(1, Tup2(10, 1)), + Tup2(1, Tup2(11, 1)), + Tup2(2, Tup2(20, 1)), + ], + vec![ + Tup2(1, Tup2(11, -1)), + Tup2(1, Tup2(12, 1)), + Tup2(4, Tup2(40, 2)), + Tup2(5, Tup2(50, 2)), + Tup2(6, Tup2(50, 2)), + Tup2(7, Tup2(50, 2)), + Tup2(8, Tup2(50, 2)), + Tup2(9, Tup2(50, 2)), + ], + ], + true, + ), + ] +} + +#[test] +fn test_integrate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::IntegrateTrace, + batches, + expect_multistep_replay, + ); + } +} + +#[test] +fn test_accumulate_trace_bootstrap_is_transactional() { + for (batches, expect_multistep_replay) in transactional_bootstrap_cases() { + run_transactional_bootstrap_test( + ReplayTraceKind::AccumulateTrace, + batches, + expect_multistep_replay, + ); + } +} diff --git a/crates/dbsp/src/circuit/runtime.rs b/crates/dbsp/src/circuit/runtime.rs index 33b760126af..37cfdba62b0 100644 --- a/crates/dbsp/src/circuit/runtime.rs +++ b/crates/dbsp/src/circuit/runtime.rs @@ -64,9 +64,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use typedmap::TypedDashMap; -/// The number of tuples a stateful operator outputs per step during replay. -pub const DEFAULT_REPLAY_STEP_SIZE: usize = 10000; - #[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub enum Error { /// Specified persistent id is not found in the circuit. @@ -304,7 +301,6 @@ struct RuntimeInner { /// Panic info collected from failed worker threads. panic_info: Vec>>>, panicked: AtomicBool, - replay_step_size: AtomicUsize, /// Tokio runtime that runs async merger tasks (see `AsyncMerger`). tokio_merger_runtime: Mutex>, @@ -511,7 +507,6 @@ impl RuntimeInner { .map(|_| EnumMap::from_fn(|_| RwLock::new(None))) .collect(), panicked: AtomicBool::new(false), - replay_step_size: AtomicUsize::new(DEFAULT_REPLAY_STEP_SIZE), tokio_merger_runtime: Mutex::new(None), exchange_listener: Mutex::new(config.exchange_listener), }) @@ -1063,29 +1058,6 @@ impl Runtime { self.inner().step_size } - /// Configure the number of tuples a stateful operator outputs per step during replay. - /// - /// The default is `DEFAULT_REPLAY_STEP_SIZE`. - pub fn set_replay_step_size(&self, step_size: usize) { - self.inner() - .replay_step_size - .store(step_size, Ordering::Release); - } - - /// Get currently configured replay step size. - /// - /// Returns `DEFAULT_REPLAY_STEP_SIZE` if the current thread doesn't have a runtime. - pub fn replay_step_size() -> usize { - RUNTIME - .with(|rt| Some(rt.borrow().as_ref()?.get_replay_step_size())) - .unwrap_or(DEFAULT_REPLAY_STEP_SIZE) - } - - /// Get currently configured replay step size. - pub fn get_replay_step_size(&self) -> usize { - self.inner().replay_step_size.load(Ordering::Acquire) - } - /// Returns the worker index as a string. /// /// This is useful for metric labels. diff --git a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs index d631e57752f..75581b41221 100644 --- a/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs +++ b/crates/dbsp/src/circuit/schedule/dynamic_scheduler.rs @@ -147,9 +147,6 @@ impl Notifications { } enum TransactionPhase { - /// Not started - Idle, - /// Started, but not yet committing. Started, @@ -360,7 +357,7 @@ impl Inner { notifications: Notifications::new(num_async_nodes), handles: JoinSet::new(), waiting: false, - transaction_phase: TransactionPhase::Idle, + transaction_phase: TransactionPhase::CommitComplete, global_commit_consensus: Broadcast::new(), metadata_broadcast: Broadcast::new(), before_first_step: true, diff --git a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs index 829be69c111..0fef3ebbf65 100644 --- a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs +++ b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::dynamic::trace::{DelayedTraceId, TraceBounds}; use crate::operator::{TraceBound, require_persistent_id}; @@ -1117,7 +1117,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1161,6 +1161,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1169,11 +1170,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result diff --git a/crates/dbsp/src/operator/dynamic/trace.rs b/crates/dbsp/src/operator/dynamic/trace.rs index 2d90686a3f2..85270e14630 100644 --- a/crates/dbsp/src/operator/dynamic/trace.rs +++ b/crates/dbsp/src/operator/dynamic/trace.rs @@ -1,6 +1,6 @@ -use crate::Runtime; use crate::circuit::circuit_builder::{StreamId, register_replay_stream}; use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS}; +use crate::circuit::splitter_output_chunk_size; use crate::dynamic::{Factory, Weight, WeightTrait}; use crate::operator::require_persistent_id; use crate::trace::spine_async::WithSnapshot; @@ -1184,7 +1184,7 @@ where { fn get_output(&mut self) -> T { //println!("Z1-{}::get_output", &self.global_id); - let replay_step_size = Runtime::replay_step_size(); + let replay_step_size = splitter_output_chunk_size(); if self.replay_mode { // One output per transaction. @@ -1228,6 +1228,7 @@ where self.delta_stream.as_ref().unwrap().value().put(batch); if !replay.borrow_cursor().key_valid() { self.replay_state = None; + self.flush_output = false; } } else { // Continue producing empty outputs as long as the circuit is in the replay mode. @@ -1236,11 +1237,12 @@ where .unwrap() .value() .put(B::dyn_empty(&self.batch_factories)); + self.flush_output = false; } + } else { + self.flush_output = false; } - self.flush_output = false; - let mut result = self.trace.take().unwrap(); result.clear_dirty_flag(); result diff --git a/crates/fda/src/bench.rs b/crates/fda/src/bench.rs index 30fb977d192..fddecf19533 100644 --- a/crates/fda/src/bench.rs +++ b/crates/fda/src/bench.rs @@ -547,6 +547,7 @@ pub(crate) async fn bench(client: Client, format: OutputFormat, args: BenchmarkA no_wait: false, initial: "running".to_string(), bootstrap_policy: "allow".to_string(), + silent_bootstrap: false, no_dismiss_error: false, }, client.clone(), diff --git a/crates/fda/src/cli.rs b/crates/fda/src/cli.rs index d22f5ace8b5..a5a31ca5149 100644 --- a/crates/fda/src/cli.rs +++ b/crates/fda/src/cli.rs @@ -352,6 +352,9 @@ pub enum PipelineAction { // TODO: auto-complete #[arg(long, short = 'b', default_value = "await_approval")] bootstrap_policy: String, + /// Bootstrap the pipeline with output connectors disabled. + #[arg(long, default_value_t = false)] + silent_bootstrap: bool, /// Do not dismiss any deployment error before starting. #[arg(long, default_value_t = false)] no_dismiss_error: bool, @@ -362,6 +365,9 @@ pub enum PipelineAction { /// The name of the pipeline. #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] name: String, + /// Bootstrap the pipeline with output connectors disabled. + #[arg(long, default_value_t = false)] + silent_bootstrap: bool, }, /// Checkpoint a fault-tolerant pipeline. @@ -415,6 +421,9 @@ pub enum PipelineAction { /// The bootstrap policy to use. #[arg(long, short = 'b', default_value = "await_approval")] bootstrap_policy: String, + /// Bootstrap the pipeline with output connectors disabled. + #[arg(long, default_value_t = false)] + silent_bootstrap: bool, /// Do not dismiss any deployment error before starting. #[arg(long, default_value_t = false)] no_dismiss_error: bool, @@ -614,6 +623,9 @@ pub enum PipelineAction { /// The bootstrap policy to use. #[arg(long, short = 'b', default_value = "await_approval")] bootstrap_policy: String, + /// Bootstrap the pipeline with output connectors disabled. + #[arg(long, default_value_t = false, requires("start"))] + silent_bootstrap: bool, /// Do not dismiss any deployment error before starting. #[arg(long, default_value_t = false, requires("start"))] no_dismiss_error: bool, diff --git a/crates/fda/src/main.rs b/crates/fda/src/main.rs index e75f97b9189..095c96716d9 100644 --- a/crates/fda/src/main.rs +++ b/crates/fda/src/main.rs @@ -743,6 +743,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) no_wait, initial, bootstrap_policy, + silent_bootstrap, no_dismiss_error, } => { if initial != "standby" && initial != "paused" && initial != "running" { @@ -863,6 +864,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .pipeline_name(name.clone()) .initial(&initial) .bootstrap_policy(&bootstrap_policy) + .silent_bootstrap(silent_bootstrap) .dismiss_error(false) // It has already been separately dismissed .send() .await @@ -958,10 +960,14 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) ); } } - PipelineAction::Approve { name } => { + PipelineAction::Approve { + name, + silent_bootstrap, + } => { let response = client .post_pipeline_approve() .pipeline_name(name.clone()) + .silent_bootstrap(silent_bootstrap) .send() .await .map_err(handle_errors_fatal( @@ -1030,6 +1036,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) no_wait, initial, bootstrap_policy, + silent_bootstrap, no_dismiss_error, } => { let current_status = client @@ -1077,6 +1084,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) no_wait, initial, bootstrap_policy, + silent_bootstrap, no_dismiss_error, }, client, @@ -1664,6 +1672,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) start, initial, bootstrap_policy, + silent_bootstrap, no_dismiss_error, } => { let client2 = client.clone(); @@ -1676,6 +1685,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) no_wait: false, initial, bootstrap_policy, + silent_bootstrap, no_dismiss_error, }, client, diff --git a/crates/feldera-types/src/query_params.rs b/crates/feldera-types/src/query_params.rs index 85926fd6b66..7e6f2425c63 100644 --- a/crates/feldera-types/src/query_params.rs +++ b/crates/feldera-types/src/query_params.rs @@ -72,3 +72,11 @@ pub struct SamplyProfileGetParams { /// If false or not provided, returns the last collected profile. pub latest: bool, } + +/// Query parameters to the `/approve` endpoint. +#[derive(Debug, Deserialize, IntoParams, ToSchema)] +pub struct ApproveParameters { + /// Bootstrap the pipeline with output connectors disabled. + #[serde(default)] + pub silent_bootstrap: bool, +} diff --git a/crates/feldera-types/src/runtime_status.rs b/crates/feldera-types/src/runtime_status.rs index 71ef1b1100a..813b32015d2 100644 --- a/crates/feldera-types/src/runtime_status.rs +++ b/crates/feldera-types/src/runtime_status.rs @@ -223,6 +223,33 @@ impl Display for BootstrapPolicy { } } +/// Bootstrap-related configuration for a deployment start request. +#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct BootstrapConfig { + /// Bootstrap policy. + pub bootstrap_policy: BootstrapPolicy, + /// Bootstrap the pipeline with output connectors disabled. + pub silent_bootstrap: bool, +} + +impl From for BootstrapConfig { + fn from(bootstrap_policy: BootstrapPolicy) -> Self { + Self { + bootstrap_policy, + silent_bootstrap: false, + } + } +} + +impl BootstrapConfig { + pub fn with_silent_bootstrap(self, silent_bootstrap: bool) -> Self { + Self { + silent_bootstrap, + ..self + } + } +} + /// Details about pipeline storage, which are returned as part of the regular runtime status polling /// by the runner. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index 007e2a5a072..817f4e8c6e8 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -15,7 +15,9 @@ use actix_web::{ web::{self, Data as WebData, ReqData}, HttpRequest, HttpResponse, }; -use feldera_types::query_params::{MetricsParameters, SamplyProfileGetParams, SamplyProfileParams}; +use feldera_types::query_params::{ + ApproveParameters, MetricsParameters, SamplyProfileGetParams, SamplyProfileParams, +}; use feldera_types::{program_schema::SqlIdentifier, query_params::ActivateParams}; use std::time::Duration; use tracing::{debug, info}; @@ -1923,6 +1925,7 @@ pub(crate) async fn post_pipeline_activate( security(("JSON web token (JWT) or API key" = [])), params( ("pipeline_name" = String, Path, description = "Unique pipeline name"), + ApproveParameters, ), responses( (status = ACCEPTED diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs index f90b2549346..08524809ff1 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs @@ -30,7 +30,9 @@ use feldera_types::adapter_stats::PipelineStatsErrorsResponse; use feldera_types::config::{InputEndpointConfig, OutputEndpointConfig, RuntimeConfig}; use feldera_types::error::ErrorResponse; use feldera_types::program_schema::ProgramSchema; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{ + BootstrapConfig, BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus, +}; use futures_util::future::join_all; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -344,7 +346,8 @@ pub struct PipelineSelectedInfoInternal { pub deployment_runtime_status_since: Option>, pub deployment_runtime_desired_status: Option, pub deployment_runtime_desired_status_since: Option>, - pub bootstrap_policy: Option, + #[serde(flatten)] + pub bootstrap_config: Option, #[serde(skip_serializing_if = "Option::is_none")] pub connectors: Option, } @@ -408,7 +411,7 @@ impl PipelineSelectedInfoInternal { deployment_runtime_desired_status: extended_pipeline.deployment_runtime_desired_status, deployment_runtime_desired_status_since: extended_pipeline .deployment_runtime_desired_status_since, - bootstrap_policy: extended_pipeline.bootstrap_policy, + bootstrap_config: extended_pipeline.bootstrap_config, connectors: None, } } @@ -467,7 +470,7 @@ impl PipelineSelectedInfoInternal { deployment_runtime_desired_status: extended_pipeline.deployment_runtime_desired_status, deployment_runtime_desired_status_since: extended_pipeline .deployment_runtime_desired_status_since, - bootstrap_policy: extended_pipeline.bootstrap_policy, + bootstrap_config: extended_pipeline.bootstrap_config, connectors: None, } } @@ -682,8 +685,12 @@ pub struct PostStartPipelineParameters { /// become `standby`, `paused` or `running` (only valid values). #[serde(default = "default_pipeline_start_desired")] initial: String, + /// Bootstrap policy. #[serde(default)] bootstrap_policy: BootstrapPolicy, + /// Bootstrap the pipeline with output connectors disabled. + #[serde(default)] + silent_bootstrap: bool, #[serde(default = "default_pipeline_start_dismiss_error")] dismiss_error: bool, } @@ -1358,9 +1365,15 @@ pub(crate) async fn post_pipeline_start( let PostStartPipelineParameters { initial, bootstrap_policy, + silent_bootstrap, dismiss_error, } = query.into_inner(); + let bootstrap_config = BootstrapConfig { + bootstrap_policy, + silent_bootstrap, + }; + let pipeline_id = match initial.as_str() { "standby" => { state @@ -1371,7 +1384,7 @@ pub(crate) async fn post_pipeline_start( *tenant_id, &pipeline_name, RuntimeDesiredStatus::Standby, - bootstrap_policy, + bootstrap_config, dismiss_error, ) .await? @@ -1385,7 +1398,7 @@ pub(crate) async fn post_pipeline_start( *tenant_id, &pipeline_name, RuntimeDesiredStatus::Paused, - bootstrap_policy, + bootstrap_config, dismiss_error, ) .await? @@ -1399,7 +1412,7 @@ pub(crate) async fn post_pipeline_start( *tenant_id, &pipeline_name, RuntimeDesiredStatus::Running, - bootstrap_policy, + bootstrap_config, dismiss_error, ) .await? diff --git a/crates/pipeline-manager/src/api/examples.rs b/crates/pipeline-manager/src/api/examples.rs index 7839f8557fe..d246c5e339d 100644 --- a/crates/pipeline-manager/src/api/examples.rs +++ b/crates/pipeline-manager/src/api/examples.rs @@ -70,7 +70,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr { storage_status_details: None, deployment_id: None, deployment_initial: None, - bootstrap_policy: None, + bootstrap_config: None, deployment_resources_status: ResourcesStatus::Stopped, deployment_resources_status_details: None, deployment_resources_status_since: Default::default(), @@ -157,7 +157,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr { storage_status_details: None, deployment_id: None, deployment_initial: None, - bootstrap_policy: None, + bootstrap_config: None, deployment_resources_status: ResourcesStatus::Stopped, deployment_resources_status_details: None, deployment_resources_status_since: Default::default(), diff --git a/crates/pipeline-manager/src/api/support_data_collector.rs b/crates/pipeline-manager/src/api/support_data_collector.rs index 02ff2d1a33c..93688774740 100644 --- a/crates/pipeline-manager/src/api/support_data_collector.rs +++ b/crates/pipeline-manager/src/api/support_data_collector.rs @@ -1263,7 +1263,7 @@ mod tests { use crate::db::types::pipeline::PipelineDescr; use crate::db::types::program::{RustCompilationInfo, SqlCompilationInfo}; use crate::db::types::version::Version; - use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; + use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use serde_json::json; use std::sync::Arc; use tokio::sync::Mutex; @@ -1484,7 +1484,7 @@ mod tests { tenant_id, "test_pipeline", RuntimeDesiredStatus::Running, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -1681,7 +1681,7 @@ mod tests { tenant_id, "test_pipeline", RuntimeDesiredStatus::Running, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -1859,7 +1859,7 @@ mod tests { tenant_id, "test_pipeline", RuntimeDesiredStatus::Running, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -1967,7 +1967,7 @@ mod tests { tenant_id, "test_pipeline", RuntimeDesiredStatus::Running, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await diff --git a/crates/pipeline-manager/src/db/operations/pipeline.rs b/crates/pipeline-manager/src/db/operations/pipeline.rs index 38f0c9fc301..fa93eb92275 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline.rs @@ -10,7 +10,7 @@ use crate::db::operations::utils::{ maybe_tenant_id_foreign_key_constraint_err, maybe_unique_violation, }; use crate::db::types::pipeline::{ - bootstrap_policy_to_string, runtime_desired_status_to_string, runtime_status_to_string, + bootstrap_config_to_string, runtime_desired_status_to_string, runtime_status_to_string, ExtendedPipelineDescr, ExtendedPipelineDescrEventInfo, ExtendedPipelineDescrMonitoring, PipelineDescr, PipelineId, }; @@ -31,7 +31,7 @@ use crate::db::types::utils::{ use crate::db::types::version::Version; use deadpool_postgres::Transaction; use feldera_types::error::ErrorResponse; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use rmp_serde::{from_slice, to_vec}; use serde_json::json; use tokio_postgres::Row; @@ -889,7 +889,7 @@ pub(crate) async fn set_deployment_resources_desired_status( pipeline_name: &str, new_desired_status: ResourcesDesiredStatus, initial_runtime_desired_status: Option, - bootstrap_policy: Option, + bootstrap_config: Option, dismiss_error: bool, ) -> Result { let current = get_pipeline(txn, tenant_id, pipeline_name).await?; @@ -917,20 +917,20 @@ pub(crate) async fn set_deployment_resources_desired_status( // runner notices. Otherwise, another (not desired) status transition will set it // to NULL. // - Provisioned: new value - let (final_deployment_initial, final_bootstrap_policy) = match new_desired_status { + let (final_deployment_initial, final_bootstrap_config) = match new_desired_status { ResourcesDesiredStatus::Stopped => { check_precondition( initial_runtime_desired_status.is_none(), "initial_runtime_desired_status should be None when becoming desired Stopped", )?; check_precondition( - bootstrap_policy.is_none(), - "bootstrap_policy should be None when becoming desired Stopped", + bootstrap_config.is_none(), + "bootstrap_config should be None when becoming desired Stopped", )?; if current.deployment_resources_status == ResourcesStatus::Stopped { (None, None) } else { - (current.deployment_initial, current.bootstrap_policy) + (current.deployment_initial, current.bootstrap_config) } } ResourcesDesiredStatus::Provisioned => { @@ -939,10 +939,10 @@ pub(crate) async fn set_deployment_resources_desired_status( "initial_runtime_desired_status should be Some when becoming desired Provisioned", )?; check_precondition( - bootstrap_policy.is_some(), - "bootstrap_policy should be Some when becoming desired Provisioned", + bootstrap_config.is_some(), + "bootstrap_config should be Some when becoming desired Provisioned", )?; - (initial_runtime_desired_status, bootstrap_policy) + (initial_runtime_desired_status, bootstrap_config) } }; @@ -956,9 +956,9 @@ pub(crate) async fn set_deployment_resources_desired_status( } // If the current bootstrap policy is already set, it cannot be changed - if let Some(current_bootstrap_policy) = current.bootstrap_policy { - if let Some(new_bootstrap_policy) = final_bootstrap_policy { - if current_bootstrap_policy != new_bootstrap_policy { + if let Some(current_bootstrap_config) = current.bootstrap_config { + if let Some(new_bootstrap_config) = final_bootstrap_config { + if current_bootstrap_config != new_bootstrap_config { return Err(DBError::BootstrapPolicyImmutableUnlessStopped); } } @@ -1003,7 +1003,7 @@ pub(crate) async fn set_deployment_resources_desired_status( &[ &new_desired_status.to_string(), &final_deployment_initial.map(runtime_desired_status_to_string), - &final_bootstrap_policy.map(bootstrap_policy_to_string), + &final_bootstrap_config.map(bootstrap_config_to_string), &match final_deployment_error { None => None, Some(v) => Some(serialize_error_response(&v)?), @@ -1156,7 +1156,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning( None, None, current.deployment_initial, - current.bootstrap_policy, + current.bootstrap_config, ) .await } @@ -1229,7 +1229,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned( Some(new_deployment_location), None, current.deployment_initial, - current.bootstrap_policy, + current.bootstrap_config, ) .await } @@ -1359,7 +1359,7 @@ async fn set_deployment_resources_status( final_deployment_location: Option, new_storage_status_details: Option, final_deployment_initial: Option, - final_bootstrap_policy: Option, + final_bootstrap_config: Option, ) -> Result<(), DBError> { // Validate that the new or existing deployment configuration is valid if let Some(deployment_config) = &final_deployment_config { @@ -1417,7 +1417,7 @@ async fn set_deployment_resources_status( &new_storage_status_details.map(|v| v.to_string()), // $4: storage_status_details &final_deployment_id, // $5: deployment_id &final_deployment_initial.map(runtime_desired_status_to_string), // $6: deployment_initial - &final_bootstrap_policy.map(bootstrap_policy_to_string), // $7: bootstrap_policy + &final_bootstrap_config.map(bootstrap_config_to_string), // $7: bootstrap_config &final_deployment_resources_status.to_string(), // $8: deployment_resources_status, &final_deployment_resources_status_details.map(|v| v.to_string()), // $9: deployment_resources_status_details, &final_deployment_runtime_status.map(runtime_status_to_string), // $10: deployment_runtime_status, diff --git a/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs b/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs index 1ee5a679cfd..f53d5f50fc3 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs @@ -3,7 +3,7 @@ use crate::db::types::monitor::{ ExtendedPipelineMonitorEvent, PipelineMonitorEvent, PipelineMonitorEventId, }; use crate::db::types::pipeline::{ - parse_string_as_bootstrap_policy, parse_string_as_runtime_desired_status, + parse_string_as_bootstrap_config, parse_string_as_runtime_desired_status, parse_string_as_runtime_status, ExtendedPipelineDescr, ExtendedPipelineDescrEventInfo, ExtendedPipelineDescrMonitoring, PipelineId, }; @@ -17,7 +17,7 @@ use crate::db::types::utils::{ use crate::db::types::version::Version; use chrono::{DateTime, Utc}; use feldera_types::error::ErrorResponse; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use tokio_postgres::Row; use tracing::error; use uuid::Uuid; @@ -89,7 +89,7 @@ pub fn parse_pipeline_row_all(row: &Row) -> Result Result Option Result, DBError> { +fn parse_from_row_bootstrap_config(row: &Row) -> Result, DBError> { Ok(match row.get::<_, Option>("bootstrap_policy") { None => None, - Some(s) => Some(parse_string_as_bootstrap_policy(s)?), + Some(s) => Some(parse_string_as_bootstrap_config(s)?), }) } @@ -611,7 +611,7 @@ mod tests { storage_status_details: None, deployment_id: None, deployment_initial: None, - bootstrap_policy: None, + bootstrap_config: None, deployment_resources_status: ResourcesStatus::Stopped, deployment_resources_status_since: Default::default(), deployment_resources_status_details: None, @@ -660,7 +660,7 @@ mod tests { deployment_runtime_status_details: None, deployment_runtime_status_since: None, deployment_runtime_desired_status: None, - bootstrap_policy: None, + bootstrap_config: None, deployment_runtime_desired_status_since: None, }) .unwrap(); diff --git a/crates/pipeline-manager/src/db/storage.rs b/crates/pipeline-manager/src/db/storage.rs index 911956e7445..a7d020be6db 100644 --- a/crates/pipeline-manager/src/db/storage.rs +++ b/crates/pipeline-manager/src/db/storage.rs @@ -14,7 +14,7 @@ use crate::db::types::tenant::TenantId; use crate::db::types::version::Version; use async_trait::async_trait; use feldera_types::error::ErrorResponse; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use uuid::Uuid; #[derive(Debug, PartialEq, Eq)] @@ -59,7 +59,7 @@ impl ExtendedPipelineDescrRunner { .clone(), deployment_runtime_status_since: pipeline.deployment_runtime_status_since, deployment_runtime_desired_status: pipeline.deployment_runtime_desired_status, - bootstrap_policy: pipeline.bootstrap_policy, + bootstrap_config: pipeline.bootstrap_config, deployment_runtime_desired_status_since: pipeline .deployment_runtime_desired_status_since, }, @@ -317,7 +317,7 @@ pub(crate) trait Storage { tenant_id: TenantId, pipeline_name: &str, initial: RuntimeDesiredStatus, - bootstrap_policy: BootstrapPolicy, + bootstrap_config: BootstrapConfig, dismiss_error: bool, ) -> Result; diff --git a/crates/pipeline-manager/src/db/storage_postgres.rs b/crates/pipeline-manager/src/db/storage_postgres.rs index 35296cc1ae9..2f2e99af84f 100644 --- a/crates/pipeline-manager/src/db/storage_postgres.rs +++ b/crates/pipeline-manager/src/db/storage_postgres.rs @@ -28,7 +28,7 @@ use async_trait::async_trait; use deadpool_postgres::{Manager, Pool, RecyclingMethod}; use feldera_types::config::{PipelineConfig, RuntimeConfig}; use feldera_types::error::ErrorResponse; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use tokio_postgres::Row; use tracing::{debug, info}; use uuid::Uuid; @@ -714,7 +714,7 @@ impl Storage for StoragePostgres { tenant_id: TenantId, pipeline_name: &str, initial: RuntimeDesiredStatus, - bootstrap_policy: BootstrapPolicy, + bootstrap_config: BootstrapConfig, dismiss_error: bool, ) -> Result { let mut client = self.pool.get().await?; @@ -725,7 +725,7 @@ impl Storage for StoragePostgres { pipeline_name, ResourcesDesiredStatus::Provisioned, Some(initial), - Some(bootstrap_policy), + Some(bootstrap_config), dismiss_error, ) .await?; diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index ae20859af54..e6dee973334 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -37,7 +37,7 @@ use feldera_types::config::{ use feldera_types::error::ErrorResponse; use feldera_types::program_schema::ProgramSchema; use feldera_types::runtime_status::{ - BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus, StorageStatusDetails, + BootstrapConfig, BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus, StorageStatusDetails, }; use openssl::sha; use proptest::prelude::*; @@ -1620,7 +1620,7 @@ async fn pipeline_transition_after_quick_stop() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -1841,7 +1841,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2047,7 +2047,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2170,7 +2170,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2188,7 +2188,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2260,7 +2260,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2278,7 +2278,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2318,7 +2318,7 @@ async fn pipeline_deployment() { tenant_id, "example1", RuntimeDesiredStatus::Paused, - BootstrapPolicy::default(), + BootstrapConfig::default(), true, ) .await @@ -2457,7 +2457,7 @@ async fn pipeline_provision_version_guard() { tenant_id, &pipeline.name, RuntimeDesiredStatus::Running, - BootstrapPolicy::default(), + BootstrapConfig::default(), false, ) .await @@ -2523,7 +2523,7 @@ async fn pipeline_provision_version_guard() { tenant_id, &pipeline.name, RuntimeDesiredStatus::Paused, - BootstrapPolicy::Allow, + BootstrapConfig::from(BootstrapPolicy::Allow), false, ) .await @@ -3473,8 +3473,8 @@ fn db_impl_behaves_like_model() { } StorageAction::SetDeploymentResourcesDesiredStatusProvisioned(tenant_id, pipeline_name, initial, dismiss_error) => { create_tenants_if_not_exists(&model, &handle, tenant_id).await.unwrap(); - let model_response = model.set_deployment_resources_desired_status_provisioned(tenant_id, &pipeline_name, initial, BootstrapPolicy::default(), dismiss_error).await; - let impl_response = handle.db.set_deployment_resources_desired_status_provisioned(tenant_id, &pipeline_name, initial, BootstrapPolicy::default(), dismiss_error).await; + let model_response = model.set_deployment_resources_desired_status_provisioned(tenant_id, &pipeline_name, initial, BootstrapConfig::default(), dismiss_error).await; + let impl_response = handle.db.set_deployment_resources_desired_status_provisioned(tenant_id, &pipeline_name, initial, BootstrapConfig::default(), dismiss_error).await; check_responses(i, model_response, impl_response); } StorageAction::SetDeploymentResourcesDesiredStatusStoppedIfNotProvisioned(tenant_id, pipeline_name) => { @@ -3995,7 +3995,7 @@ fn convert_descriptor_to_monitoring( deployment_runtime_status_details: pipeline.deployment_runtime_status_details, deployment_runtime_status_since: pipeline.deployment_runtime_status_since, deployment_runtime_desired_status: pipeline.deployment_runtime_desired_status, - bootstrap_policy: pipeline.bootstrap_policy, + bootstrap_config: pipeline.bootstrap_config, deployment_runtime_desired_status_since: pipeline.deployment_runtime_desired_status_since, } } @@ -4378,7 +4378,7 @@ impl Storage for Mutex { deployment_runtime_status_since: None, deployment_runtime_desired_status: None, deployment_runtime_desired_status_since: None, - bootstrap_policy: None, + bootstrap_config: None, }; // Insert into state @@ -4760,7 +4760,7 @@ impl Storage for Mutex { tenant_id: TenantId, pipeline_name: &str, initial: RuntimeDesiredStatus, - bootstrap_policy: BootstrapPolicy, + bootstrap_config: BootstrapConfig, dismiss_error: bool, ) -> Result { // Validate @@ -4782,8 +4782,8 @@ impl Storage for Mutex { return Err(DBError::InitialImmutableUnlessStopped); } if pipeline - .bootstrap_policy - .is_some_and(|v| v != bootstrap_policy) + .bootstrap_config + .is_some_and(|v| v != bootstrap_config) { return Err(DBError::BootstrapPolicyImmutableUnlessStopped); } @@ -4808,7 +4808,7 @@ impl Storage for Mutex { // Apply changes: update pipeline.deployment_initial = Some(initial); - pipeline.bootstrap_policy = Some(bootstrap_policy); + pipeline.bootstrap_config = Some(bootstrap_config); pipeline.deployment_resources_desired_status = new_resources_desired_status; pipeline.deployment_resources_desired_status_since = Utc::now(); pipeline.deployment_error = new_deployment_error; @@ -4846,7 +4846,7 @@ impl Storage for Mutex { } pipeline.deployment_resources_desired_status = new_resources_desired_status; pipeline.deployment_resources_desired_status_since = Utc::now(); - pipeline.bootstrap_policy = None; + pipeline.bootstrap_config = None; self.lock() .await .pipelines @@ -4883,7 +4883,7 @@ impl Storage for Mutex { } pipeline.deployment_resources_desired_status = new_resources_desired_status; pipeline.deployment_resources_desired_status_since = Utc::now(); - pipeline.bootstrap_policy = None; + pipeline.bootstrap_config = None; pipeline.refresh_version = Version(pipeline.refresh_version.0 + 1); self.lock() .await @@ -5138,7 +5138,7 @@ impl Storage for Mutex { // Set resources desired status to Stopped pipeline.deployment_initial = None; - pipeline.bootstrap_policy = None; + pipeline.bootstrap_config = None; pipeline.deployment_resources_desired_status = ResourcesDesiredStatus::Stopped; pipeline.deployment_resources_desired_status_since = Utc::now(); self.lock() @@ -5154,7 +5154,7 @@ impl Storage for Mutex { } pipeline.deployment_id = None; pipeline.deployment_initial = None; - pipeline.bootstrap_policy = None; + pipeline.bootstrap_config = None; pipeline.deployment_config = None; pipeline.deployment_location = None; pipeline.deployment_error = deployment_error; @@ -5223,7 +5223,7 @@ impl Storage for Mutex { // Apply changes pipeline.deployment_id = None; pipeline.deployment_initial = None; - pipeline.bootstrap_policy = None; + pipeline.bootstrap_config = None; pipeline.deployment_config = None; pipeline.deployment_location = None; // Retain: pipeline.deployment_error diff --git a/crates/pipeline-manager/src/db/types/pipeline.rs b/crates/pipeline-manager/src/db/types/pipeline.rs index 2ef5402fb98..222a06196d3 100644 --- a/crates/pipeline-manager/src/db/types/pipeline.rs +++ b/crates/pipeline-manager/src/db/types/pipeline.rs @@ -5,7 +5,9 @@ use crate::db::types::storage::StorageStatus; use crate::db::types::version::Version; use chrono::{DateTime, Utc}; use feldera_types::error::ErrorResponse; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; +use feldera_types::runtime_status::{ + BootstrapConfig, BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus, +}; use serde::{Deserialize, Serialize}; use std::fmt; use std::fmt::Display; @@ -92,19 +94,28 @@ pub fn parse_string_as_runtime_desired_status(s: String) -> Result String { match bootstrap { - BootstrapPolicy::Allow => "allow", - BootstrapPolicy::Reject => "reject", - BootstrapPolicy::AwaitApproval => "await_approval", + BootstrapPolicy::Allow => "allow".to_string(), + BootstrapPolicy::Reject => "reject".to_string(), + BootstrapPolicy::AwaitApproval => "await_approval".to_string(), } - .to_string() } -pub fn parse_string_as_bootstrap_policy(s: String) -> Result { +/// Serialize BootstrapConfig as a JSON string. +pub fn bootstrap_config_to_string(bootstrap: BootstrapConfig) -> String { + serde_json::to_string(&bootstrap).unwrap() +} + +/// Backward compatible deserialization of the bootstrap_policy field. +/// +/// Old format: bootstrap policy only, silent_bootstrap is implied to be false. +/// New format: BootstrapConfig serialized as a JSON string. +pub fn parse_string_as_bootstrap_config(s: String) -> Result { match s.as_str() { - "allow" => Ok(BootstrapPolicy::Allow), - "reject" => Ok(BootstrapPolicy::Reject), - "await_approval" => Ok(BootstrapPolicy::AwaitApproval), - _ => Err(DBError::InvalidBootstrap(s)), + "allow" => Ok(BootstrapConfig::from(BootstrapPolicy::Allow)), + "reject" => Ok(BootstrapConfig::from(BootstrapPolicy::Reject)), + "await_approval" => Ok(BootstrapConfig::from(BootstrapPolicy::AwaitApproval)), + _ => serde_json::from_str::(&s) + .map_err(|_| DBError::InvalidBootstrap(s.clone())), } } @@ -250,7 +261,8 @@ pub struct ExtendedPipelineDescr { /// Policy enforced when the pipeline has to bootstrap /// due to detected changes caused by recompilation. - pub bootstrap_policy: Option, + #[serde(flatten)] + pub bootstrap_config: Option, /// Resources status of the current deployment. pub deployment_resources_status: ResourcesStatus, @@ -318,7 +330,8 @@ pub struct ExtendedPipelineDescrMonitoring { pub deployment_runtime_status_details: Option, pub deployment_runtime_status_since: Option>, pub deployment_runtime_desired_status: Option, - pub bootstrap_policy: Option, + #[serde(flatten)] + pub bootstrap_config: Option, pub deployment_runtime_desired_status_since: Option>, } diff --git a/crates/pipeline-manager/src/runner/local_runner.rs b/crates/pipeline-manager/src/runner/local_runner.rs index d6ea3b8226d..8c822a846be 100644 --- a/crates/pipeline-manager/src/runner/local_runner.rs +++ b/crates/pipeline-manager/src/runner/local_runner.rs @@ -17,7 +17,7 @@ use feldera_observability::ReqwestTracingExt; use feldera_types::config::{ PipelineConfig, PipelineConfigProgramInfo, StorageCacheConfig, StorageConfig, }; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus}; use reqwest::StatusCode; use serde_json::json; use std::io::ErrorKind; @@ -424,7 +424,7 @@ impl PipelineExecutor for LocalRunner { async fn provision( &mut self, deployment_initial: RuntimeDesiredStatus, - bootstrap_policy: Option, + bootstrap_config: Option, deployment_id: &Uuid, deployment_config: &PipelineConfig, _program_info: &serde_json::Value, @@ -587,10 +587,15 @@ impl PipelineExecutor for LocalRunner { .stdout(Stdio::piped()) .stderr(Stdio::piped()); - if let Some(bootstrap_policy) = bootstrap_policy { + if let Some(bootstrap_config) = bootstrap_config { command .arg("--bootstrap-policy") - .arg(bootstrap_policy_to_string(bootstrap_policy)); + .arg(bootstrap_policy_to_string( + bootstrap_config.bootstrap_policy, + )); + if bootstrap_config.silent_bootstrap { + command.arg("--silent-bootstrap"); + } } if let Some((https_tls_cert_path, https_tls_key_path)) = diff --git a/crates/pipeline-manager/src/runner/pipeline_automata.rs b/crates/pipeline-manager/src/runner/pipeline_automata.rs index 1fdbe0e912f..1de881bae81 100644 --- a/crates/pipeline-manager/src/runner/pipeline_automata.rs +++ b/crates/pipeline-manager/src/runner/pipeline_automata.rs @@ -1297,9 +1297,9 @@ impl PipelineAutomaton { None }; - let bootstrap_policy = + let bootstrap_config = if Self::platform_version_requires_bootstrap_policy(&pipeline.platform_version) { - Some(pipeline.bootstrap_policy.unwrap_or_default()) + Some(pipeline.bootstrap_config.unwrap_or_default()) } else { None }; @@ -1318,7 +1318,7 @@ impl PipelineAutomaton { .pipeline_handle .provision( deployment_initial, - bootstrap_policy, + bootstrap_config, &deployment_id, &deployment_config, program_info, @@ -1827,7 +1827,7 @@ mod test { use async_trait::async_trait; use feldera_types::config::{PipelineConfig, StorageConfig}; use feldera_types::program_schema::ProgramSchema; - use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus, RuntimeStatus}; + use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; use serde_json::json; use std::str::FromStr; use std::sync::Arc; @@ -1867,7 +1867,7 @@ mod test { async fn provision( &mut self, _: RuntimeDesiredStatus, - _: Option, + _: Option, _: &Uuid, _: &PipelineConfig, _: &serde_json::Value, @@ -1921,7 +1921,7 @@ mod test { automaton.tenant_id, &pipeline.name, initial, - BootstrapPolicy::default(), + BootstrapConfig::default(), true, ) .await diff --git a/crates/pipeline-manager/src/runner/pipeline_executor.rs b/crates/pipeline-manager/src/runner/pipeline_executor.rs index 02e9529bfbc..4562eae83ae 100644 --- a/crates/pipeline-manager/src/runner/pipeline_executor.rs +++ b/crates/pipeline-manager/src/runner/pipeline_executor.rs @@ -5,7 +5,7 @@ use crate::error::ManagerError; use crate::runner::pipeline_logs::LogsSender; use async_trait::async_trait; use feldera_types::config::{PipelineConfig, StorageConfig}; -use feldera_types::runtime_status::{BootstrapPolicy, RuntimeDesiredStatus}; +use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus}; use std::time::Duration; use uuid::Uuid; @@ -55,7 +55,7 @@ pub trait PipelineExecutor: Sync + Send { async fn provision( &mut self, deployment_initial: RuntimeDesiredStatus, - bootstrap_policy: Option, + bootstrap_config: Option, deployment_id: &Uuid, deployment_config: &PipelineConfig, program_info: &serde_json::Value, diff --git a/docs.feldera.com/docs/pipelines/modifying.md b/docs.feldera.com/docs/pipelines/modifying.md index 35e0beaee4b..df1532bcb8a 100644 --- a/docs.feldera.com/docs/pipelines/modifying.md +++ b/docs.feldera.com/docs/pipelines/modifying.md @@ -131,6 +131,27 @@ from input connectors. Input connectors that were not modified in the new versio whose tables have not changed resume ingestion from their checkpointed position in the input stream; other input connectors will start from the initial state specified in their configuration. +### Silent bootstrapping + +By default, bootstrapping sends the complete contents of new and modified views +to their output connectors. This is useful when the connected sink must be +rebuilt from the bootstrapped view contents, but it can produce duplicate or +stale records if the sink already contains the correct data. This is the case +for example when bootstrapping is triggered by a Feldera upgrade (see +[below](#caveat-1-feldera-runtime-upgrade-can-modify-the-pipeline)). + +Set `silent_bootstrap=true` to bootstrap the pipeline with output connectors +disabled for the duration of bootstrapping. The pipeline still evaluates the +new and modified views and updates its internal state. Output connector progress +also advances through the bootstrapped input records, but no bootstrapped output +records are transmitted to external sinks. Once bootstrapping completes and the +pipeline enters `Running` or `Paused`, output connectors resume normal operation +and emit subsequent incremental changes. + +Use silent bootstrapping when the external sink already has the desired contents. +Do not use it if the sink needs to receive the full contents of newly +added or modified views during bootstrap. + ## Caveats and limitations ### Caveat 1: Feldera runtime upgrade can modify the pipeline @@ -156,6 +177,9 @@ If this is undesirable, use one of the following methods to avoid updating the S 2. Pin the pipeline to a specific runtime version using the [`runtime_version` property](/pipelines/lifecycle). +3. Use [silent bootstrapping](#silent-bootstrapping) to avoid sending duplicate data to output + connectors during bootstrapping. + ### Caveat 2: Starting from an S3 checkpoint Normally, when restarting, a pipeline continues from the checkpoint taken at the point when the pipeline was stopped, containing @@ -215,6 +239,11 @@ All of this functionality is also available via the Python SDK and the `fda` CLI `await_approval`, `allow`, and `reject`. The default value is `await_approval`. See the [Restart the pipeline](#3-restart-the-pipeline) section above for details. +* The `silent_bootstrap` argument to the [`/start` endpoint](/api/start-pipeline) and + [`/approve` endpoint](/api/approve-bootstrap). Set this argument to `true` + to suppress output connector records during bootstrapping, as described in + [Silent bootstrapping](#silent-bootstrapping). + * Two runtime states reported by the [pipeline status endpoint](/api/get-pipeline) in the `deployment_runtime_status` field: * `AwaitingApproval` - When starting the pipeline with `bootstrap_policy=await_approval`, if the pipeline requires bootstrapping, it will stop in the `AwaitingApproval` state waiting for the user to approve the @@ -227,6 +256,8 @@ All of this functionality is also available via the Python SDK and the `fda` CLI * The [`/approve` endpoint](/api/approve-bootstrap). Invoking this endpoint transitions the pipeline from the `AwaitingApproval` to `Bootstrapping` state. + Use `/approve?silent_bootstrap=true` to approve the changes while suppressing + output connector records produced during bootstrapping. ## WebConsole diff --git a/openapi.json b/openapi.json index 1c1344fb7e2..34503946855 100644 --- a/openapi.json +++ b/openapi.json @@ -1946,6 +1946,15 @@ "schema": { "type": "string" } + }, + { + "name": "silent_bootstrap", + "in": "path", + "description": "Bootstrap the pipeline with output connectors disabled.", + "required": true, + "schema": { + "type": "boolean" + } } ], "responses": { @@ -5152,11 +5161,21 @@ { "name": "bootstrap_policy", "in": "query", + "description": "Bootstrap policy.", "required": false, "schema": { "$ref": "#/components/schemas/BootstrapPolicy" } }, + { + "name": "silent_bootstrap", + "in": "query", + "description": "Bootstrap the pipeline with output connectors disabled.", + "required": false, + "schema": { + "type": "boolean" + } + }, { "name": "dismiss_error", "in": "query", diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index 191e7a4d5af..77a676aad5b 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -506,6 +506,7 @@ def activate( def start( self, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -520,6 +521,7 @@ def start( - If the pipeline is in PAUSED state, use `.meth:resume` instead. :param bootstrap_policy: The bootstrap policy to use. + :param silent_bootstrap: Set True to bootstrap the pipeline with output connectors disabled. False by default. :param timeout_s: The maximum time (in seconds) to wait for the pipeline to start. :param wait: Set True to wait for the pipeline to start. True by default @@ -531,6 +533,7 @@ def start( self.client.start_pipeline( self.name, bootstrap_policy=bootstrap_policy, + silent_bootstrap=silent_bootstrap, wait=wait, timeout_s=timeout_s, dismiss_error=dismiss_error, @@ -539,6 +542,7 @@ def start( def start_paused( self, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -557,6 +561,7 @@ def start_paused( return self.client.start_pipeline_as_paused( self.name, bootstrap_policy=bootstrap_policy, + silent_bootstrap=silent_bootstrap, wait=wait, timeout_s=timeout_s, dismiss_error=dismiss_error, @@ -565,6 +570,7 @@ def start_paused( def start_standby( self, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -583,6 +589,7 @@ def start_standby( self.client.start_pipeline_as_standby( self.name, bootstrap_policy=bootstrap_policy, + silent_bootstrap=silent_bootstrap, wait=wait, timeout_s=timeout_s, dismiss_error=dismiss_error, @@ -591,6 +598,7 @@ def start_standby( def restart( self, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, timeout_s: Optional[float] = None, dismiss_error: bool = True, ): @@ -611,6 +619,7 @@ def restart( self.stop(force=True, timeout_s=timeout_s) self.start( bootstrap_policy=bootstrap_policy, + silent_bootstrap=silent_bootstrap, timeout_s=timeout_s, dismiss_error=dismiss_error, ) @@ -655,7 +664,7 @@ def dismiss_error(self): self.client.dismiss_error_pipeline(self.name) - def approve(self): + def approve(self, silent_bootstrap: bool = False): """ Approves the pipeline to proceed with bootstrapping. @@ -665,7 +674,7 @@ def approve(self): before proceeding with the bootstrapping process. """ - self.client.approve_pipeline(self.name) + self.client.approve_pipeline(self.name, silent_bootstrap=silent_bootstrap) def resume(self, wait: bool = True, timeout_s: Optional[float] = None): """ diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index d07930235df..415463819bf 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -485,6 +485,7 @@ def _inner_start_pipeline( pipeline_name: str, initial: str = "running", bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -512,6 +513,9 @@ def _inner_start_pipeline( if bootstrap_policy is not None: start_params["bootstrap_policy"] = bootstrap_policy.value + if silent_bootstrap: + start_params["silent_bootstrap"] = "true" + self.http.post( path=f"/pipelines/{pipeline_name}/start", params=start_params, @@ -530,6 +534,7 @@ def start_pipeline( self, pipeline_name: str, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -549,6 +554,7 @@ def start_pipeline( pipeline_name, "running", bootstrap_policy, + silent_bootstrap, wait, timeout_s, dismiss_error, @@ -558,6 +564,7 @@ def start_pipeline_as_paused( self, pipeline_name: str, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: float | None = None, dismiss_error: bool = True, @@ -576,6 +583,7 @@ def start_pipeline_as_paused( pipeline_name, "paused", bootstrap_policy, + silent_bootstrap, wait, timeout_s, dismiss_error, @@ -585,6 +593,7 @@ def start_pipeline_as_standby( self, pipeline_name: str, bootstrap_policy: Optional[BootstrapPolicy] = None, + silent_bootstrap: bool = False, wait: bool = True, timeout_s: Optional[float] = None, dismiss_error: bool = True, @@ -603,6 +612,7 @@ def start_pipeline_as_standby( pipeline_name, "standby", bootstrap_policy, + silent_bootstrap, wait, timeout_s, dismiss_error, diff --git a/python/feldera/testutils.py b/python/feldera/testutils.py index 1f4fd45150e..ba8d4b5c772 100644 --- a/python/feldera/testutils.py +++ b/python/feldera/testutils.py @@ -264,6 +264,7 @@ def build_pipeline( tables: dict, views: List[ViewSpec], resources: Optional[Resources] = None, + dev_tweaks: Optional[dict] = None, ) -> Pipeline: sql = generate_program(tables, views) @@ -277,6 +278,7 @@ def build_pipeline( resources=resources, workers=FELDERA_TEST_NUM_WORKERS, hosts=FELDERA_TEST_NUM_HOSTS, + dev_tweaks=dev_tweaks, ), ).create_or_replace() @@ -319,6 +321,25 @@ def transaction(pipeline: Pipeline, duration_seconds: int): log(f"Transaction committed in {time.monotonic() - commit_start} seconds") +def transaction_num_records(pipeline: Pipeline, num_records: int): + """Run a transaction until it ingests a record count or reaches end of input.""" + + log(f"Running transaction for {num_records} records or end of input") + initial_records = number_of_processed_records(pipeline) + pipeline.start_transaction() + + while not check_end_of_input(pipeline): + processed_records = number_of_processed_records(pipeline) - initial_records + if processed_records >= num_records: + break + time.sleep(3) + + log("Committing transaction") + commit_start = time.monotonic() + pipeline.commit_transaction() + log(f"Transaction committed in {time.monotonic() - commit_start} seconds") + + def checkpoint_pipeline(pipeline: Pipeline): """Create a checkpoint and wait for it to complete.""" @@ -362,6 +383,12 @@ def number_of_processed_records(pipeline: Pipeline) -> int: return pipeline.stats().global_metrics.total_processed_records +def number_of_input_records(pipeline: Pipeline) -> int: + """Get the total_input_records metric.""" + + return pipeline.stats().global_metrics.total_input_records + + def run_workload( pipeline_name: str, tables: dict, diff --git a/python/tests/platform/test_bootstrapping.py b/python/tests/platform/test_bootstrapping.py index ddc2cda9327..472dc3084a9 100644 --- a/python/tests/platform/test_bootstrapping.py +++ b/python/tests/platform/test_bootstrapping.py @@ -1,9 +1,14 @@ +import json +import os +import uuid + from feldera.enums import BootstrapPolicy, PipelineStatus from feldera.pipeline_builder import PipelineBuilder from feldera.runtime_config import RuntimeConfig from tests import TEST_CLIENT, enterprise_only from .helper import ( gen_pipeline_name, + wait_for_condition, ) from feldera.testutils import ( FELDERA_TEST_NUM_WORKERS, @@ -277,6 +282,141 @@ def test_bootstrap_enterprise(pipeline_name): pipeline.stop(force=True) +@enterprise_only +@gen_pipeline_name +def test_silent_bootstrap_enterprise(pipeline_name): + """ + Enterprise: silent bootstrapping must process backfilled records without + transmitting them to output connectors. + """ + + output_path = os.path.join( + "/tmp", f"feldera_silent_bootstrap_{uuid.uuid4().hex}.json" + ) + + def sql_for_view(view_expr: str) -> str: + connectors = json.dumps( + [ + { + "name": "out", + "transport": { + "name": "file_output", + "config": {"path": output_path}, + }, + "format": {"name": "json"}, + } + ] + ) + return f""" +CREATE TABLE t1(x int) WITH ('materialized'='true'); +CREATE MATERIALIZED VIEW v1 +WITH ('connectors' = '{connectors}') +AS SELECT {view_expr} AS x FROM t1; +""" + + def output_metrics(): + return pipeline.output_connector_stats("v1", "out").metrics + + def processed_records() -> int: + return output_metrics().total_processed_input_records or 0 + + def transmitted_records() -> int: + return output_metrics().transmitted_records or 0 + + def wait_for_output_progress( + min_processed_records: int, expected_transmitted_records: int + ): + wait_for_condition( + f"output connector reaches {min_processed_records} processed records", + lambda: processed_records() >= min_processed_records, + timeout_s=120.0, + poll_interval_s=1.0, + ) + assert transmitted_records() == expected_transmitted_records + + pipeline = PipelineBuilder( + TEST_CLIENT, + pipeline_name, + sql=sql_for_view("x"), + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + hosts=FELDERA_TEST_NUM_HOSTS, + fault_tolerance_model=None, + ), + ).create_or_replace() + + pipeline.start() + pipeline.input_json("t1", [{"x": 1}, {"x": 2}, {"x": 3}]) + wait_for_output_progress(min_processed_records=3, expected_transmitted_records=3) + expected_processed_records = 3 + pipeline.checkpoint(True) + pipeline.stop(force=True) + + pipeline.modify(sql=sql_for_view("x + 1")) + pipeline.start( + bootstrap_policy=BootstrapPolicy.ALLOW, + silent_bootstrap=True, + timeout_s=300, + ) + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=0, + ) + + pipeline.input_json("t1", [{"x": 5}]) + expected_processed_records += 1 + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=1, + ) + assert list(pipeline.query("SELECT COUNT(*) AS c FROM v1;")) == [{"c": 4}] + pipeline.checkpoint(True) + pipeline.stop(force=True) + + pipeline.modify(sql=sql_for_view("x + 2")) + pipeline.start( + bootstrap_policy=BootstrapPolicy.ALLOW, + silent_bootstrap=True, + timeout_s=300, + ) + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=0, + ) + + pipeline.input_json("t1", [{"x": 6}]) + expected_processed_records += 1 + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=1, + ) + assert list(pipeline.query("SELECT COUNT(*) AS c FROM v1;")) == [{"c": 5}] + pipeline.checkpoint(True) + pipeline.stop(force=True) + + # Final round with silent bootstrap disabled should produce all accumulated records. + pipeline.modify(sql=sql_for_view("x + 3")) + pipeline.start( + bootstrap_policy=BootstrapPolicy.ALLOW, + silent_bootstrap=False, + timeout_s=300, + ) + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=expected_processed_records, + ) + + pipeline.input_json("t1", [{"x": 7}]) + expected_processed_records += 1 + wait_for_output_progress( + min_processed_records=expected_processed_records, + expected_transmitted_records=expected_processed_records, + ) + assert list(pipeline.query("SELECT COUNT(*) AS c FROM v1;")) == [{"c": 6}] + pipeline.checkpoint(True) + pipeline.stop(force=True) + + @enterprise_only @gen_pipeline_name def test_bootstrap_non_materialized_table_enterprise(pipeline_name): diff --git a/python/tests/workloads/test_tpch.py b/python/tests/workloads/test_tpch.py index d68d3dccc70..2d4bc5ec469 100644 --- a/python/tests/workloads/test_tpch.py +++ b/python/tests/workloads/test_tpch.py @@ -1,20 +1,24 @@ import sys import time import unittest +from feldera.enums import BootstrapPolicy from feldera.pipeline import Pipeline from feldera.testutils import ( IndexSpec, ViewSpec, build_pipeline, check_for_endpoint_errors, + check_end_of_input, checkpoint_pipeline, generate_program, log, number_of_processed_records, + number_of_input_records, run_workload, - transaction, + transaction_num_records, unique_pipeline_name, validate_outputs, + wait_end_of_input, ) import tempfile import os @@ -32,6 +36,8 @@ def __init__( s3_path: Optional[str] = None, s3_region: Optional[str] = None, input_dir: Optional[str] = None, + segment_size: Optional[int] = None, + num_segments: Optional[int] = None, ): self.mode = mode @@ -39,6 +45,12 @@ def __init__( raise ValueError(f"Unknown mode: {mode}") self.input_mode = input_mode + self.segment_size = segment_size + self.num_segments = num_segments + + if self.num_segments is not None: + if self.num_segments <= 0 or self.num_segments > 20: + raise ValueError("num_segments must be between 1 and 20") if self.input_mode == "file": self.input_dir = input_dir @@ -117,6 +129,22 @@ def run_cli(): help="S3 bucket region. Required if --s3-bucket or --s3-path is specified.", ) + parser.add_argument( + "--num-segments", + type=int, + nargs="?", + default=None, + help="Number of test segments. Only used in checkpoint mode. The test divides all views in the benchmark into this many groups and adds one group of views per segment to the pipeline.", + ) + + parser.add_argument( + "--segment-size", + type=int, + nargs="?", + default=None, + help="Approximate number of records ingested per segment. Only used in checkpoint mode.", + ) + args = parser.parse_args() if sum(x is not None for x in (args.s3_bucket, args.s3_path, args.input_dir)) > 1: @@ -131,6 +159,7 @@ def run_cli(): elif args.s3_bucket: input_mode = "s3" s3_region = args.s3_region + s3_path = None elif args.s3_path: input_mode = "delta" s3_path = args.s3_path @@ -153,6 +182,8 @@ def run_cli(): s3_region=s3_region, s3_path=s3_path, input_dir=args.input_dir, + segment_size=args.segment_size, + num_segments=args.num_segments, ) tpch_test(config) @@ -1297,24 +1328,34 @@ def tpch_test_segment( pipeline: Pipeline, tables: dict, views: List[ViewSpec], - expected_processed_records, + last_processed: int, segment_size: int, -) -> tuple[bool, int]: + previously_non_empty_views: List[str], +) -> tuple[bool, int, List[str]]: """Run a test segment. - Start the pipeline (from a checkpoint if one exists), run a series of transactions followed by streaming ingest periods, - until the pipeline processed segment_size records. A checkpoint is created halfway through the segment, or at the end. The - pipeline is then paused,outputs validated, and the pipeline stopped. + Start the pipeline (from a checkpoint if one exists), run a transaction to process segment_size records. + A checkpoint is created at the end of the segment. The pipeline is then paused, outputs validated, and + the pipeline stopped. + + previously_non_empty_views is a list of views that were non-empty before the start of the segment. + This is used to check that the views are still non-empty when the pipeline is restarted, i.e., + output snapshots are correctly populated by the bootstrapping process. """ # Start pipeline. start_time = time.monotonic() - log( - f"Starting pipeline to process {segment_size} records, starting from (approximately) {expected_processed_records} processed records" - ) - pipeline.start() + log(f"Starting pipeline to process {segment_size} records") + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) log(f"Pipeline started in {time.monotonic() - start_time} seconds") + for view_name in previously_non_empty_views: + count = view_row_count(pipeline, view_name) + if count == 0: + raise RuntimeError( + f"View {view_name} was non-empty before restart, but is empty after restart" + ) + # Current number of processed records. initial_processed_records = number_of_processed_records(pipeline) @@ -1322,56 +1363,25 @@ def tpch_test_segment( f"Initial processed records at the start of a segment: {initial_processed_records}" ) - if initial_processed_records < expected_processed_records: + if initial_processed_records < last_processed: raise RuntimeError( - f"Expected at least {expected_processed_records} processed records on startup, got {initial_processed_records}" + f"Expected at least {last_processed} processed records on startup, got {initial_processed_records}" ) - # Expected number of processed records after this segment. - expected_processed_records = initial_processed_records + segment_size - - # Make a checkpoint halfway through the segment after processing this many records. - halfway_processed_records = ( - initial_processed_records + expected_processed_records - ) >> 1 - - checkpoint = False + # Transaction + transaction_num_records(pipeline, segment_size) + check_for_endpoint_errors(pipeline) - while not pipeline.is_complete(): - current_processed_records = number_of_processed_records(pipeline) - log( - f"Processed {current_processed_records} total records so far (processed {current_processed_records - initial_processed_records} records in this segment)" - ) + processed_before_checkpoint = number_of_processed_records(pipeline) + non_empty_views = [ + view.name for view in views if view_row_count(pipeline, view.name) > 0 + ] + checkpoint_pipeline(pipeline) - if current_processed_records >= expected_processed_records: - log("Сompleting test segment") - break - - # Transaction - transaction(pipeline, 100) - check_for_endpoint_errors(pipeline) - - # Streaming ingest (no transaction) - log("Running streaming ingest for 10 seconds") - time.sleep(10) - check_for_endpoint_errors(pipeline) - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - - if processed_before_checkpoint >= halfway_processed_records: - log( - f"Creating checkpoint after processing {processed_before_checkpoint} records" - ) - checkpoint_pipeline(pipeline) - checkpoint = True - - if not checkpoint: - processed_before_checkpoint = number_of_processed_records(pipeline) - log( - f"Creating checkpoint at the end of the segment after processing {processed_before_checkpoint} records" - ) - checkpoint(pipeline) + # Streaming ingest (no transaction) + log("Running streaming ingest for 10 seconds") + time.sleep(10) + check_for_endpoint_errors(pipeline) pipeline.pause() @@ -1383,7 +1393,14 @@ def tpch_test_segment( log("Stopping pipeline") pipeline.stop(force=True) - return (complete, processed_before_checkpoint) + return (complete, processed_before_checkpoint, non_empty_views) + + +def view_row_count(pipeline: Pipeline, view_name: str) -> int: + escaped_view_name = view_name.replace('"', '""') + return next(pipeline.query(f'SELECT COUNT(*) as cnt FROM "{escaped_view_name}"'))[ + "cnt" + ] def tpch_test(config: TPCHTestConfig): @@ -1401,29 +1418,107 @@ def tpch_test(config: TPCHTestConfig): views = tpch_views(q_dirs) if config.mode == "checkpoint": + log("Starting checkpoint mode with all tables and no views") pipeline = build_pipeline( - unique_pipeline_name("tpc-h-checkpoint"), tables, views + unique_pipeline_name("tpc-h-checkpoint"), + tables, + [], + dev_tweaks={"adaptive_joins": True}, ) + if config.segment_size is not None: + segment_size = config.segment_size + else: + segment_size = 100_000_000 + + if config.num_segments is not None: + views_per_segment = ( + len(views) + config.num_segments - 1 + ) // config.num_segments + else: + views_per_segment = 5 last_processed = 0 - iteration = 1 - modified_views = views - while True: - (complete, last_processed) = tpch_test_segment( - pipeline, tables, modified_views, last_processed, 100000000 + complete = False + non_empty_views: List[str] = [] + + view_counts = list(range(views_per_segment, len(views) + 1, views_per_segment)) + if views and (not view_counts or view_counts[-1] != len(views)): + view_counts.append(len(views)) + + for view_count in view_counts: + modified_views = views[:view_count] + log( + f"Checkpoint view-add phase: adding {view_count}/{len(views)} views: " + f"{', '.join(view.name for view in modified_views)}" + ) + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + log( + f"Running checkpoint segment with {view_count} views from {last_processed} processed records" + ) + + (complete, last_processed, non_empty_views) = tpch_test_segment( + pipeline, + tables, + modified_views, + last_processed, + segment_size, + non_empty_views, + ) + log( + f"Completed checkpoint view-add segment with complete={complete}, last_processed={last_processed}" ) if complete: break - iteration += 1 - modified_views = list( - map( - lambda view: view.clone_with_name(f"{view.name}_{iteration}"), views - ) - ) + # Test a different type of pipeline change: rename all views in the program. + modified_views = [ + view.clone_with_name(f"{view.name}_renamed") for view in views + ] + log(f"Renaming all views: {', '.join(view.name for view in modified_views)}") + + sql = generate_program(tables, modified_views) + pipeline.modify(sql=sql) + + # Process remaining data in one transaction. + pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW) + start_time = time.monotonic() + + try: + pipeline.start_transaction() + except Exception as e: + log(f"Error starting transaction: {e}") + + if config.segment_size is not None: + expected_inputs = number_of_input_records(pipeline) + config.segment_size + while number_of_input_records( + pipeline + ) < expected_inputs and not check_end_of_input(pipeline): + time.sleep(3) + else: + wait_end_of_input(pipeline, timeout_s=3600) - sql = generate_program(tables, modified_views) - pipeline.modify(sql=sql) + elapsed = time.monotonic() - start_time + log(f"Remaining data ingested in {elapsed}") + + start_time = time.monotonic() + try: + pipeline.commit_transaction(transaction_id=None, wait=True, timeout_s=None) + log(f"Commit took {time.monotonic() - start_time}") + except Exception as e: + log(f"Error committing transaction: {e}") + + pipeline.pause() + + # log("Waiting for outputs to flush") + # start_time = time.monotonic() + # pipeline.wait_for_completion(force_stop=False, timeout_s=3600) + # log(f"Flushing outputs took {time.monotonic() - start_time}") + + validate_outputs(pipeline, tables, modified_views) + + pipeline.stop(force=True) elif config.mode == "transaction": pipeline = run_workload(