Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 44 additions & 38 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,10 @@ struct CircuitThread {
input_metadata: HashMap<String, Option<Resume>>,

commit_updates: Option<CommitUpdates>,

/// Set to true on startup if the circuit requires bootstrapping.
/// Cleared when the circuit completes bootstrapping.
bootstrapping: bool,
}

struct CommitUpdates {
Expand Down Expand Up @@ -2759,9 +2763,9 @@ impl CircuitThread {
incarnation_uuid,
)?;

controller
.status
.set_bootstrap_in_progress(circuit.bootstrap_in_progress());
let bootstrapping = circuit.bootstrap_in_progress();

controller.status.set_bootstrap_in_progress(bootstrapping);

let input_metadata = input_metadata.map(|input_metadata| {
input_metadata
Expand All @@ -2777,7 +2781,7 @@ impl CircuitThread {

// The pipeline hasn't changed based on input and output persistent id values,
// yet the circuit is bootstrapping. This is a bug.
if can_replay && circuit.bootstrap_in_progress() {
if can_replay && bootstrapping {
return Err(ControllerError::UnexpectedBootstrap {
bootstrap_info: circuit.bootstrap_info().clone(),
});
Expand All @@ -2798,10 +2802,10 @@ impl CircuitThread {
}?;

// The above code ensures that replay and bootstrapping cannot happen at the same time.
assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress()));
assert!(!(ft.is_replaying() && bootstrapping));

// Disable journaling while we're bootstrapping the circuit.
if circuit.bootstrap_in_progress() {
if bootstrapping {
ft.disable();
}

Expand Down Expand Up @@ -2829,6 +2833,7 @@ impl CircuitThread {
checkpoint_sender,
input_metadata: input_metadata.unwrap_or_default(),
commit_updates: None,
bootstrapping,
})
}

Expand Down Expand Up @@ -2857,10 +2862,7 @@ impl CircuitThread {
// so that if the first step() we perform below before entering the loop
// ends up finishing bootstrapping, we will still perform an extra step to initialize
// the output table snapshots inside the loop.
let mut trigger = StepTrigger::new(
self.controller.clone(),
self.circuit.bootstrap_in_progress(),
);
let mut trigger = StepTrigger::new(self.controller.clone());
if config.global.cpu_profiler {
self.circuit.enable_cpu_profiler().unwrap_or_else(|e| {
error!("Failed to enable CPU profiler: {e}");
Expand All @@ -2875,7 +2877,7 @@ impl CircuitThread {
//
// Skip this during bootstrap to avoid a slow first step. We don't guarantee
// that view snapshots are up-to-date until bootstrap is complete.
if !self.circuit.bootstrap_in_progress()
if !self.bootstrapping
&& let Err(error) = self.step()
{
let _ = init_status_sender.send(Err(error));
Expand Down Expand Up @@ -2949,7 +2951,11 @@ impl CircuitThread {
self.last_checkpoint(),
self.last_checkpoint_sync(),
self.replaying(),
self.circuit.bootstrap_in_progress(),
// `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete,
// which is required to initialize the output snapshots.
// We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress`
// rather than `self.bootstrapping` here.
self.controller.status.bootstrap_in_progress(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to reuse self.bootstrapping?
if there is one maybe this needs an explanation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment explaining this.

self.checkpoint_requested(),
self.sync_checkpoint_requested(),
coordination_request,
Expand Down Expand Up @@ -3022,22 +3028,37 @@ impl CircuitThread {
transaction_state.into_coordination_status(),
));

// If bootstrapping has completed, update the status flag.
self.controller
.status
.set_bootstrap_in_progress(self.circuit.bootstrap_in_progress());

// Update `trace_snapshot` to the latest traces.
//
// We do this before updating `total_processed_records` so that ad hoc
// query results always reflect all data that we have reported
// processing; otherwise, there is a race for any code that runs a query
// as soon as input has been processed.
if transaction_state == TransactionState::None {
Span::new("update")
.with_category("Step")
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
.in_scope(|| self.update_snapshot());
// Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction).
// This guarantees that:
// 1. Ad hoc queries observe a consistent view of the data.
// 2. Ad hoc snapshots are up-to-date before the pipeline is marked as running.
if !self.controller.status.bootstrap_in_progress() {
Span::new("update")
.with_category("Step")
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
.in_scope(|| self.update_snapshot());
}

let bootstrapping = self.circuit.bootstrap_in_progress();

// If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
// until the circuit performs an extra transaction to initialize output snapshots
// (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
// is true).
if self.bootstrapping && !bootstrapping {
self.bootstrapping = false;
} else {
self.controller
.status
.set_bootstrap_in_progress(bootstrapping);
}
}

// Record that we've processed the records, unless there is a transaction in progress,
Expand Down Expand Up @@ -4243,10 +4264,6 @@ struct StepTrigger {

/// Time between automatic checkpoint syncs.
sync_interval: Option<Duration>,

/// The circuit is bootstrapping. Used to detect the transition from bootstrapping
/// to normal mode.
bootstrapping: bool,
}

/// Action for the controller to take.
Expand All @@ -4267,7 +4284,7 @@ enum Action {

impl StepTrigger {
/// Returns a new [StepTrigger].
fn new(controller: Arc<ControllerInner>, bootstrapping: bool) -> Self {
fn new(controller: Arc<ControllerInner>) -> Self {
let config = &controller.status.pipeline_config.global;
let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs);
let min_batch_size_records = config.min_batch_size_records;
Expand All @@ -4287,7 +4304,6 @@ impl StepTrigger {
max_buffering_delay,
min_batch_size_records,
checkpoint_interval,
bootstrapping,
sync_interval,
}
}
Expand Down Expand Up @@ -4349,16 +4365,7 @@ impl StepTrigger {
}
_ => Some(Action::Park(None)),
}
} else if replaying
|| self.controller.transaction_commit_requested()
|| bootstrapping
|| self.bootstrapping
{
// The `self.bootstrapping` condition above detects a transition
// from bootstrapping to normal operation and makes sure that the
// circuit performs an extra step in the normal mode in order to
// initialize output table snapshots of output relations that did
// not participate in bootstrapping.
} else if replaying || self.controller.transaction_commit_requested() || bootstrapping {
Some(Action::Step)
} else if timer_expired(next_checkpoint, now) && !checkpoint_requested {
Some(Action::Checkpoint)
Expand Down Expand Up @@ -4425,7 +4432,6 @@ impl StepTrigger {
}
};

self.bootstrapping = bootstrapping;
if result == Action::Step {
self.buffer_timeout = None;
}
Expand Down
27 changes: 21 additions & 6 deletions crates/dbsp/src/circuit/circuit_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream<C, B>(
// We currently only support using operators in the top-level circuit
// as replay sources.
if TypeId::of::<()>() == TypeId::of::<C::Time>() {
circuit.cache_insert(
ReplaySource::new(stream.stream_id()),
Box::new(replay_stream.clone()),
);
// If a replay source already exists, don't overwrite it. This normally shouldn't
// happen as we should not have more than one integral for each stream. One situation
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment correctly calls this a workaround — could you file a follow-up issue to actually de-duplicate the integrals? Relying on registration order is fragile; a future refactor that swaps the order of input_upsert vs. the downstream join/aggregate construction would silently break replay again, and the symptom ("not materialized") is misleading.

// where this does happen today is for input streams that have an integral without
// an accumulator as part of input_upsert, and another integral with an accumulator
// created by a downstream join or aggregate. In this case, we want to use the former
// for replay, as the latter may have been added in the new version of the program
// and may be empty, while the former can have state (conversely, if the input integral
// is empty, the downstream integral is guaranteed to be empty too).
if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) {
circuit.cache_insert(
ReplaySource::new(stream.stream_id()),
Box::new(replay_stream.clone()),
);
}
}
}

Expand Down Expand Up @@ -7597,10 +7607,15 @@ impl CircuitHandle {
return true;
};

replay_info.replay_sources.keys().all(|node_id| {
// Bootstrapping is finished when all replay sources have completed their replay and the
// transaction has been committed.

let all_complete = replay_info.replay_sources.keys().all(|node_id| {
self.circuit
.map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
})
});

all_complete && self.is_commit_complete()
}

/// Finalize the replay phase of the circuit.
Expand Down
14 changes: 0 additions & 14 deletions crates/dbsp/src/circuit/dbsp_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,20 +1397,6 @@ impl DBSPHandle {
Ok(progress)
}

pub fn set_replay_step_size(&mut self, step_size: usize) {
if let Some(handle) = self.runtime.as_ref() {
handle.runtime().set_replay_step_size(step_size);
}
}

pub fn get_replay_step_size(&self) -> usize {
if let Some(handle) = self.runtime.as_ref() {
handle.runtime().get_replay_step_size()
} else {
0
}
}

/// The circuit has been resumed from a checkpoint and is currently bootstrapping the modified part of the circuit.
pub fn bootstrap_in_progress(&self) -> bool {
self.bootstrap_info.is_some()
Expand Down
Loading
Loading