Skip to content

Commit 194d30f

Browse files
committed
Silent bootstrapping.
Fix #6090. Add /silent_bootstrapping argument to /start and /approve calls, which disables output connectors during bootstrapping. The primary use case for this is bootstrapping the pipeline after a Feldera upgrade where the contents of views is not expected to change. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 7148c81 commit 194d30f

25 files changed

Lines changed: 680 additions & 155 deletions

crates/adapters/src/controller.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,6 +2395,9 @@ struct CircuitThread {
23952395
ft: Option<FtState>,
23962396
parker: Parker,
23972397

2398+
/// Whether to suppress output connector records during bootstrapping.
2399+
silent_bootstrap: bool,
2400+
23982401
checkpoint_delay_warning: Option<LongOperationWarning>,
23992402
checkpoint_requests: Vec<CheckpointRequest>,
24002403
running_checkpoint: Option<RunningCheckpoint>,
@@ -2598,17 +2601,18 @@ impl CircuitThread {
25982601

25992602
if !diff.is_empty() {
26002603
info!("Pipeline changes detected: {diff}");
2601-
if state.bootstrap_policy() == BootstrapPolicy::Reject {
2604+
let bootstrap_policy = state.bootstrap_config().bootstrap_policy;
2605+
if bootstrap_policy == BootstrapPolicy::Reject {
26022606
return Err(ControllerError::BootstrapRejectedByUser);
2603-
} else if state.bootstrap_policy() == BootstrapPolicy::AwaitApproval {
2607+
} else if bootstrap_policy == BootstrapPolicy::AwaitApproval {
26042608
info!("Awaiting user approval before bootstrapping modified pipeline.");
26052609
state.set_phase(PipelinePhase::Initializing(
26062610
InitializationState::AwaitingApproval(Box::new(diff.clone())),
26072611
));
26082612
}
26092613

26102614
loop {
2611-
match state.bootstrap_policy() {
2615+
match state.bootstrap_config().bootstrap_policy {
26122616
BootstrapPolicy::Allow => {
26132617
info!(
26142618
"User approved pipeline changes. Proceeding with initialization."
@@ -2758,6 +2762,9 @@ impl CircuitThread {
27582762
backpressure_thread,
27592763
storage,
27602764
parker,
2765+
silent_bootstrap: state
2766+
.map(|state| state.bootstrap_config().silent_bootstrap)
2767+
.unwrap_or(false),
27612768
checkpoint_delay_warning: None,
27622769
checkpoint_requests: Vec::new(),
27632770
running_checkpoint: None,
@@ -3715,16 +3722,36 @@ impl CircuitThread {
37153722
/// this step. If `processed_records` is `None`, we're in the middle of a
37163723
/// transaction and the records are not fully processed yet.
37173724
fn push_output(&mut self, processed_records: Option<ProcessedRecords>) {
3725+
let silent_bootstrap =
3726+
self.silent_bootstrap && self.controller.status.bootstrap_in_progress();
3727+
37183728
let outputs = self.controller.outputs.read().unwrap();
37193729
for (_stream, (output_handles, endpoints)) in outputs.iter_by_stream() {
3720-
let delta_batch = output_handles.delta_handle.as_ref().concat();
3721-
let num_delta_records = delta_batch.len();
3730+
let (mut delta_batch, num_delta_records) = if silent_bootstrap {
3731+
let _ = output_handles.delta_handle.take_from_all();
3732+
(None, 0)
3733+
} else {
3734+
let delta_batch = output_handles.delta_handle.as_ref().concat();
3735+
let num_delta_records = delta_batch.len();
37223736

3723-
let mut delta_batch = Some(delta_batch);
3737+
(Some(delta_batch), num_delta_records)
3738+
};
37243739

37253740
for (i, endpoint_id) in endpoints.iter().enumerate() {
37263741
let endpoint = outputs.lookup_by_id(endpoint_id).unwrap();
37273742

3743+
// Silent bootstrap: send empty batch for progress tracking only.
3744+
if silent_bootstrap {
3745+
self.controller.status.enqueue_batch(*endpoint_id, 0);
3746+
endpoint.queue.push(BatchQueueEntry {
3747+
step: self.step,
3748+
data: None,
3749+
processed_records,
3750+
});
3751+
endpoint.unparker.unpark();
3752+
continue;
3753+
}
3754+
37283755
if endpoint.created_during_transaction_number
37293756
== self.controller.get_transaction_number()
37303757
{

0 commit comments

Comments
 (0)