Skip to content

Commit d33c656

Browse files
committed
[adapters] Update output snapshots before completing bootstrapping.
Fix #6091. Views that don't participate in bootstrapping don't update their snapshots until the first post-bootstrap transaction. As a result, a client making ad hoc queries right after bootstrapping completes could observe empty views. We fix this by: 1. Forcing an extra transaction after bootstrapping completes (this was already the case). 2. Maintaining `bootstrap_in_progress` status until the extra transaction commits. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 95b9a45 commit d33c656

2 files changed

Lines changed: 30 additions & 35 deletions

File tree

crates/adapters/src/controller.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2411,6 +2411,10 @@ struct CircuitThread {
24112411
input_metadata: HashMap<String, Option<Resume>>,
24122412

24132413
commit_updates: Option<CommitUpdates>,
2414+
2415+
/// Set to true on startup if the circuit requires bootstrapping.
2416+
/// Cleared when the circuit completes bootstrapping.
2417+
bootstrapping: bool,
24142418
}
24152419

24162420
struct CommitUpdates {
@@ -2677,9 +2681,9 @@ impl CircuitThread {
26772681
incarnation_uuid,
26782682
)?;
26792683

2680-
controller
2681-
.status
2682-
.set_bootstrap_in_progress(circuit.bootstrap_in_progress());
2684+
let bootstrapping = circuit.bootstrap_in_progress();
2685+
2686+
controller.status.set_bootstrap_in_progress(bootstrapping);
26832687

26842688
let input_metadata = input_metadata.map(|input_metadata| {
26852689
input_metadata
@@ -2695,7 +2699,7 @@ impl CircuitThread {
26952699

26962700
// The pipeline hasn't changed based on input and output persistent id values,
26972701
// yet the circuit is bootstrapping. This is a bug.
2698-
if can_replay && circuit.bootstrap_in_progress() {
2702+
if can_replay && bootstrapping {
26992703
return Err(ControllerError::UnexpectedBootstrap {
27002704
bootstrap_info: circuit.bootstrap_info().clone(),
27012705
});
@@ -2716,10 +2720,10 @@ impl CircuitThread {
27162720
}?;
27172721

27182722
// The above code ensures that replay and bootstrapping cannot happen at the same time.
2719-
assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress()));
2723+
assert!(!(ft.is_replaying() && bootstrapping));
27202724

27212725
// Disable journaling while we're bootstrapping the circuit.
2722-
if circuit.bootstrap_in_progress() {
2726+
if bootstrapping {
27232727
ft.disable();
27242728
}
27252729

@@ -2747,6 +2751,7 @@ impl CircuitThread {
27472751
checkpoint_sender,
27482752
input_metadata: input_metadata.unwrap_or_default(),
27492753
commit_updates: None,
2754+
bootstrapping,
27502755
})
27512756
}
27522757

@@ -2775,10 +2780,7 @@ impl CircuitThread {
27752780
// so that if the first step() we perform below before entering the loop
27762781
// ends up finishing bootstrapping, we will still perform an extra step to initialize
27772782
// the output table snapshots inside the loop.
2778-
let mut trigger = StepTrigger::new(
2779-
self.controller.clone(),
2780-
self.circuit.bootstrap_in_progress(),
2781-
);
2783+
let mut trigger = StepTrigger::new(self.controller.clone());
27822784
if config.global.cpu_profiler {
27832785
self.circuit.enable_cpu_profiler().unwrap_or_else(|e| {
27842786
error!("Failed to enable CPU profiler: {e}");
@@ -2793,7 +2795,7 @@ impl CircuitThread {
27932795
//
27942796
// Skip this during bootstrap to avoid a slow first step. We don't guarantee
27952797
// that view snapshots are up-to-date until bootstrap is complete.
2796-
if !self.circuit.bootstrap_in_progress()
2798+
if !self.bootstrapping
27972799
&& let Err(error) = self.step()
27982800
{
27992801
let _ = init_status_sender.send(Err(error));
@@ -2867,7 +2869,7 @@ impl CircuitThread {
28672869
self.last_checkpoint(),
28682870
self.last_checkpoint_sync(),
28692871
self.replaying(),
2870-
self.circuit.bootstrap_in_progress(),
2872+
self.controller.status.bootstrap_in_progress(),
28712873
self.checkpoint_requested(),
28722874
self.sync_checkpoint_requested(),
28732875
coordination_request,
@@ -2940,11 +2942,6 @@ impl CircuitThread {
29402942
transaction_state.into_coordination_status(),
29412943
));
29422944

2943-
// If bootstrapping has completed, update the status flag.
2944-
self.controller
2945-
.status
2946-
.set_bootstrap_in_progress(self.circuit.bootstrap_in_progress());
2947-
29482945
// Update `trace_snapshot` to the latest traces.
29492946
//
29502947
// We do this before updating `total_processed_records` so that ad hoc
@@ -2956,6 +2953,20 @@ impl CircuitThread {
29562953
.with_category("Step")
29572954
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
29582955
.in_scope(|| self.update_snapshot());
2956+
2957+
let bootstrapping = self.circuit.bootstrap_in_progress();
2958+
2959+
// If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
2960+
// until the circuit performs an extra transaction to initialize output snapshots
2961+
// (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
2962+
// is true).
2963+
if self.bootstrapping && !bootstrapping {
2964+
self.bootstrapping = false;
2965+
} else {
2966+
self.controller
2967+
.status
2968+
.set_bootstrap_in_progress(self.circuit.bootstrap_in_progress());
2969+
}
29592970
}
29602971

29612972
// Record that we've processed the records, unless there is a transaction in progress,
@@ -4115,10 +4126,6 @@ struct StepTrigger {
41154126

41164127
/// Time between automatic checkpoint syncs.
41174128
sync_interval: Option<Duration>,
4118-
4119-
/// The circuit is bootstrapping. Used to detect the transition from bootstrapping
4120-
/// to normal mode.
4121-
bootstrapping: bool,
41224129
}
41234130

41244131
/// Action for the controller to take.
@@ -4139,7 +4146,7 @@ enum Action {
41394146

41404147
impl StepTrigger {
41414148
/// Returns a new [StepTrigger].
4142-
fn new(controller: Arc<ControllerInner>, bootstrapping: bool) -> Self {
4149+
fn new(controller: Arc<ControllerInner>) -> Self {
41434150
let config = &controller.status.pipeline_config.global;
41444151
let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs);
41454152
let min_batch_size_records = config.min_batch_size_records;
@@ -4159,7 +4166,6 @@ impl StepTrigger {
41594166
max_buffering_delay,
41604167
min_batch_size_records,
41614168
checkpoint_interval,
4162-
bootstrapping,
41634169
sync_interval,
41644170
}
41654171
}
@@ -4221,16 +4227,7 @@ impl StepTrigger {
42214227
}
42224228
_ => Some(Action::Park(None)),
42234229
}
4224-
} else if replaying
4225-
|| self.controller.transaction_commit_requested()
4226-
|| bootstrapping
4227-
|| self.bootstrapping
4228-
{
4229-
// The `self.bootstrapping` condition above detects a transition
4230-
// from bootstrapping to normal operation and makes sure that the
4231-
// circuit performs an extra step in the normal mode in order to
4232-
// initialize output table snapshots of output relations that did
4233-
// not participate in bootstrapping.
4230+
} else if replaying || self.controller.transaction_commit_requested() || bootstrapping {
42344231
Some(Action::Step)
42354232
} else if timer_expired(next_checkpoint, now) && !checkpoint_requested {
42364233
Some(Action::Checkpoint)
@@ -4297,7 +4294,6 @@ impl StepTrigger {
42974294
}
42984295
};
42994296

4300-
self.bootstrapping = bootstrapping;
43014297
if result == Action::Step {
43024298
self.buffer_timeout = None;
43034299
}

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7554,7 +7554,6 @@ impl CircuitHandle {
75547554
});
75557555

75567556
all_complete && self.is_commit_complete()
7557-
75587557
}
75597558

75607559
/// Finalize the replay phase of the circuit.

0 commit comments

Comments
 (0)