Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 67 additions & 40 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,9 @@ struct CircuitThread {
ft: Option<FtState>,
parker: Parker,

/// Whether to suppress output connector records during bootstrapping.
silent_bootstrap: bool,

checkpoint_delay_warning: Option<LongOperationWarning>,
checkpoint_requests: Vec<CheckpointRequest>,
running_checkpoint: Option<RunningCheckpoint>,
Expand Down Expand Up @@ -2428,6 +2431,10 @@ struct CircuitThread {
input_metadata: HashMap<String, Option<Resume>>,

commit_updates: Option<CommitUpdates>,

/// Set to true on startup if the circuit requires bootstrapping.
/// Cleared when the circuit completes bootstrapping.
bootstrapping: bool,
}

struct CommitUpdates {
Expand Down Expand Up @@ -2594,17 +2601,18 @@ impl CircuitThread {

if !diff.is_empty() {
info!("Pipeline changes detected: {diff}");
if state.bootstrap_policy() == BootstrapPolicy::Reject {
let bootstrap_policy = state.bootstrap_config().bootstrap_policy;
if bootstrap_policy == BootstrapPolicy::Reject {
return Err(ControllerError::BootstrapRejectedByUser);
} else if state.bootstrap_policy() == BootstrapPolicy::AwaitApproval {
} else if bootstrap_policy == BootstrapPolicy::AwaitApproval {
info!("Awaiting user approval before bootstrapping modified pipeline.");
state.set_phase(PipelinePhase::Initializing(
InitializationState::AwaitingApproval(Box::new(diff.clone())),
));
}

loop {
match state.bootstrap_policy() {
match state.bootstrap_config().bootstrap_policy {
BootstrapPolicy::Allow => {
info!(
"User approved pipeline changes. Proceeding with initialization."
Expand Down Expand Up @@ -2694,9 +2702,9 @@ impl CircuitThread {
incarnation_uuid,
)?;

controller
.status
.set_bootstrap_in_progress(circuit.bootstrap_in_progress());
let bootstrapping = circuit.bootstrap_in_progress();

controller.status.set_bootstrap_in_progress(bootstrapping);

let input_metadata = input_metadata.map(|input_metadata| {
input_metadata
Expand All @@ -2712,7 +2720,7 @@ impl CircuitThread {

// The pipeline hasn't changed based on input and output persistent id values,
// yet the circuit is bootstrapping. This is a bug.
if can_replay && circuit.bootstrap_in_progress() {
if can_replay && bootstrapping {
return Err(ControllerError::UnexpectedBootstrap {
bootstrap_info: circuit.bootstrap_info().clone(),
});
Expand All @@ -2733,10 +2741,10 @@ impl CircuitThread {
}?;

// The above code ensures that replay and bootstrapping cannot happen at the same time.
assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress()));
assert!(!(ft.is_replaying() && bootstrapping));

// Disable journaling while we're bootstrapping the circuit.
if circuit.bootstrap_in_progress() {
if bootstrapping {
ft.disable();
}

Expand All @@ -2754,6 +2762,9 @@ impl CircuitThread {
backpressure_thread,
storage,
parker,
silent_bootstrap: state
.map(|state| state.bootstrap_config().silent_bootstrap)
.unwrap_or(false),
checkpoint_delay_warning: None,
checkpoint_requests: Vec::new(),
running_checkpoint: None,
Expand All @@ -2764,6 +2775,7 @@ impl CircuitThread {
checkpoint_sender,
input_metadata: input_metadata.unwrap_or_default(),
commit_updates: None,
bootstrapping,
})
}

Expand Down Expand Up @@ -2792,10 +2804,7 @@ impl CircuitThread {
// so that if the first step() we perform below before entering the loop
// ends up finishing bootstrapping, we will still perform an extra step to initialize
// the output table snapshots inside the loop.
let mut trigger = StepTrigger::new(
self.controller.clone(),
self.circuit.bootstrap_in_progress(),
);
let mut trigger = StepTrigger::new(self.controller.clone());
if config.global.cpu_profiler {
self.circuit.enable_cpu_profiler().unwrap_or_else(|e| {
error!("Failed to enable CPU profiler: {e}");
Expand All @@ -2810,7 +2819,7 @@ impl CircuitThread {
//
// Skip this during bootstrap to avoid a slow first step. We don't guarantee
// that view snapshots are up-to-date until bootstrap is complete.
if !self.circuit.bootstrap_in_progress()
if !self.bootstrapping
&& let Err(error) = self.step()
{
let _ = init_status_sender.send(Err(error));
Expand Down Expand Up @@ -2884,7 +2893,11 @@ impl CircuitThread {
self.last_checkpoint(),
self.last_checkpoint_sync(),
self.replaying(),
self.circuit.bootstrap_in_progress(),
// `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete,
// which is required to initialize the output snapshots.
// We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress`
// rather than `self.bootstrapping` here.
self.controller.status.bootstrap_in_progress(),
self.checkpoint_requested(),
self.sync_checkpoint_requested(),
coordination_request,
Expand Down Expand Up @@ -2957,11 +2970,6 @@ impl CircuitThread {
transaction_state.into_coordination_status(),
));

// If bootstrapping has completed, update the status flag.
self.controller
.status
.set_bootstrap_in_progress(self.circuit.bootstrap_in_progress());

// Update `trace_snapshot` to the latest traces.
//
// We do this before updating `total_processed_records` so that ad hoc
Expand All @@ -2973,6 +2981,20 @@ impl CircuitThread {
.with_category("Step")
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
.in_scope(|| self.update_snapshot());

let bootstrapping = self.circuit.bootstrap_in_progress();

// If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
// until the circuit performs an extra transaction to initialize output snapshots
// (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
// is true).
if self.bootstrapping && !bootstrapping {
self.bootstrapping = false;
} else {
self.controller
.status
.set_bootstrap_in_progress(bootstrapping);
}
}

// Record that we've processed the records, unless there is a transaction in progress,
Expand Down Expand Up @@ -3700,16 +3722,36 @@ impl CircuitThread {
/// this step. If `processed_records` is `None`, we're in the middle of a
/// transaction and the records are not fully processed yet.
fn push_output(&mut self, processed_records: Option<ProcessedRecords>) {
let silent_bootstrap =
self.silent_bootstrap && self.controller.status.bootstrap_in_progress();

let outputs = self.controller.outputs.read().unwrap();
for (_stream, (output_handles, endpoints)) in outputs.iter_by_stream() {
let delta_batch = output_handles.delta_handle.as_ref().concat();
let num_delta_records = delta_batch.len();
let (mut delta_batch, num_delta_records) = if silent_bootstrap {
let _ = output_handles.delta_handle.take_from_all();
(None, 0)
} else {
let delta_batch = output_handles.delta_handle.as_ref().concat();
let num_delta_records = delta_batch.len();

let mut delta_batch = Some(delta_batch);
(Some(delta_batch), num_delta_records)
};

for (i, endpoint_id) in endpoints.iter().enumerate() {
let endpoint = outputs.lookup_by_id(endpoint_id).unwrap();

// Silent bootstrap: send empty batch for progress tracking only.
if silent_bootstrap {
self.controller.status.enqueue_batch(*endpoint_id, 0);
endpoint.queue.push(BatchQueueEntry {
step: self.step,
data: None,
processed_records,
});
endpoint.unparker.unpark();
continue;
}

if endpoint.created_during_transaction_number
== self.controller.get_transaction_number()
{
Expand Down Expand Up @@ -4143,10 +4185,6 @@ struct StepTrigger {

/// Time between automatic checkpoint syncs.
sync_interval: Option<Duration>,

/// The circuit is bootstrapping. Used to detect the transition from bootstrapping
/// to normal mode.
bootstrapping: bool,
}

/// Action for the controller to take.
Expand All @@ -4167,7 +4205,7 @@ enum Action {

impl StepTrigger {
/// Returns a new [StepTrigger].
fn new(controller: Arc<ControllerInner>, bootstrapping: bool) -> Self {
fn new(controller: Arc<ControllerInner>) -> Self {
let config = &controller.status.pipeline_config.global;
let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs);
let min_batch_size_records = config.min_batch_size_records;
Expand All @@ -4187,7 +4225,6 @@ impl StepTrigger {
max_buffering_delay,
min_batch_size_records,
checkpoint_interval,
bootstrapping,
sync_interval,
}
}
Expand Down Expand Up @@ -4249,16 +4286,7 @@ impl StepTrigger {
}
_ => Some(Action::Park(None)),
}
} else if replaying
|| self.controller.transaction_commit_requested()
|| bootstrapping
|| self.bootstrapping
{
// The `self.bootstrapping` condition above detects a transition
// from bootstrapping to normal operation and makes sure that the
// circuit performs an extra step in the normal mode in order to
// initialize output table snapshots of output relations that did
// not participate in bootstrapping.
} else if replaying || self.controller.transaction_commit_requested() || bootstrapping {
Some(Action::Step)
} else if timer_expired(next_checkpoint, now) && !checkpoint_requested {
Some(Action::Checkpoint)
Expand Down Expand Up @@ -4325,7 +4353,6 @@ impl StepTrigger {
}
};

self.bootstrapping = bootstrapping;
if result == Action::Step {
self.buffer_timeout = None;
}
Expand Down
Loading
Loading