Skip to content

Commit 7148c81

Browse files
committed
[adapters] Update output snapshots before completing bootstrapping.
Fix #6091. Views that don't participate in bootstrapping don't update their snapshots until the first post-bootstrap transaction. As a result, a client making ad hoc queries right after bootstrapping completes could observe empty views. We fix this by: 1. Forcing an extra transaction after bootstrapping completes (this was already the case). 2. Maintaining `bootstrap_in_progress` status until the extra transaction commits. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 3f7b6e4 commit 7148c81

2 files changed

Lines changed: 34 additions & 35 deletions

File tree

crates/adapters/src/controller.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2428,6 +2428,10 @@ struct CircuitThread {
24282428
input_metadata: HashMap<String, Option<Resume>>,
24292429

24302430
commit_updates: Option<CommitUpdates>,
2431+
2432+
/// Set to true on startup if the circuit requires bootstrapping.
2433+
/// Cleared when the circuit completes bootstrapping.
2434+
bootstrapping: bool,
24312435
}
24322436

24332437
struct CommitUpdates {
@@ -2694,9 +2698,9 @@ impl CircuitThread {
26942698
incarnation_uuid,
26952699
)?;
26962700

2697-
controller
2698-
.status
2699-
.set_bootstrap_in_progress(circuit.bootstrap_in_progress());
2701+
let bootstrapping = circuit.bootstrap_in_progress();
2702+
2703+
controller.status.set_bootstrap_in_progress(bootstrapping);
27002704

27012705
let input_metadata = input_metadata.map(|input_metadata| {
27022706
input_metadata
@@ -2712,7 +2716,7 @@ impl CircuitThread {
27122716

27132717
// The pipeline hasn't changed based on input and output persistent id values,
27142718
// yet the circuit is bootstrapping. This is a bug.
2715-
if can_replay && circuit.bootstrap_in_progress() {
2719+
if can_replay && bootstrapping {
27162720
return Err(ControllerError::UnexpectedBootstrap {
27172721
bootstrap_info: circuit.bootstrap_info().clone(),
27182722
});
@@ -2733,10 +2737,10 @@ impl CircuitThread {
27332737
}?;
27342738

27352739
// The above code ensures that replay and bootstrapping cannot happen at the same time.
2736-
assert!(!(ft.is_replaying() && circuit.bootstrap_in_progress()));
2740+
assert!(!(ft.is_replaying() && bootstrapping));
27372741

27382742
// Disable journaling while we're bootstrapping the circuit.
2739-
if circuit.bootstrap_in_progress() {
2743+
if bootstrapping {
27402744
ft.disable();
27412745
}
27422746

@@ -2764,6 +2768,7 @@ impl CircuitThread {
27642768
checkpoint_sender,
27652769
input_metadata: input_metadata.unwrap_or_default(),
27662770
commit_updates: None,
2771+
bootstrapping,
27672772
})
27682773
}
27692774

@@ -2792,10 +2797,7 @@ impl CircuitThread {
27922797
// so that if the first step() we perform below before entering the loop
27932798
// ends up finishing bootstrapping, we will still perform an extra step to initialize
27942799
// the output table snapshots inside the loop.
2795-
let mut trigger = StepTrigger::new(
2796-
self.controller.clone(),
2797-
self.circuit.bootstrap_in_progress(),
2798-
);
2800+
let mut trigger = StepTrigger::new(self.controller.clone());
27992801
if config.global.cpu_profiler {
28002802
self.circuit.enable_cpu_profiler().unwrap_or_else(|e| {
28012803
error!("Failed to enable CPU profiler: {e}");
@@ -2810,7 +2812,7 @@ impl CircuitThread {
28102812
//
28112813
// Skip this during bootstrap to avoid a slow first step. We don't guarantee
28122814
// that view snapshots are up-to-date until bootstrap is complete.
2813-
if !self.circuit.bootstrap_in_progress()
2815+
if !self.bootstrapping
28142816
&& let Err(error) = self.step()
28152817
{
28162818
let _ = init_status_sender.send(Err(error));
@@ -2884,7 +2886,11 @@ impl CircuitThread {
28842886
self.last_checkpoint(),
28852887
self.last_checkpoint_sync(),
28862888
self.replaying(),
2887-
self.circuit.bootstrap_in_progress(),
2889+
// `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete,
2890+
// which is required to initialize the output snapshots.
2891+
// We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress`
2892+
// rather than `self.bootstrapping` here.
2893+
self.controller.status.bootstrap_in_progress(),
28882894
self.checkpoint_requested(),
28892895
self.sync_checkpoint_requested(),
28902896
coordination_request,
@@ -2957,11 +2963,6 @@ impl CircuitThread {
29572963
transaction_state.into_coordination_status(),
29582964
));
29592965

2960-
// If bootstrapping has completed, update the status flag.
2961-
self.controller
2962-
.status
2963-
.set_bootstrap_in_progress(self.circuit.bootstrap_in_progress());
2964-
29652966
// Update `trace_snapshot` to the latest traces.
29662967
//
29672968
// We do this before updating `total_processed_records` so that ad hoc
@@ -2973,6 +2974,20 @@ impl CircuitThread {
29732974
.with_category("Step")
29742975
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
29752976
.in_scope(|| self.update_snapshot());
2977+
2978+
let bootstrapping = self.circuit.bootstrap_in_progress();
2979+
2980+
// If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
2981+
// until the circuit performs an extra transaction to initialize output snapshots
2982+
// (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
2983+
// is true).
2984+
if self.bootstrapping && !bootstrapping {
2985+
self.bootstrapping = false;
2986+
} else {
2987+
self.controller
2988+
.status
2989+
.set_bootstrap_in_progress(bootstrapping);
2990+
}
29762991
}
29772992

29782993
// Record that we've processed the records, unless there is a transaction in progress,
@@ -4143,10 +4158,6 @@ struct StepTrigger {
41434158

41444159
/// Time between automatic checkpoint syncs.
41454160
sync_interval: Option<Duration>,
4146-
4147-
/// The circuit is bootstrapping. Used to detect the transition from bootstrapping
4148-
/// to normal mode.
4149-
bootstrapping: bool,
41504161
}
41514162

41524163
/// Action for the controller to take.
@@ -4167,7 +4178,7 @@ enum Action {
41674178

41684179
impl StepTrigger {
41694180
/// Returns a new [StepTrigger].
4170-
fn new(controller: Arc<ControllerInner>, bootstrapping: bool) -> Self {
4181+
fn new(controller: Arc<ControllerInner>) -> Self {
41714182
let config = &controller.status.pipeline_config.global;
41724183
let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs);
41734184
let min_batch_size_records = config.min_batch_size_records;
@@ -4187,7 +4198,6 @@ impl StepTrigger {
41874198
max_buffering_delay,
41884199
min_batch_size_records,
41894200
checkpoint_interval,
4190-
bootstrapping,
41914201
sync_interval,
41924202
}
41934203
}
@@ -4249,16 +4259,7 @@ impl StepTrigger {
42494259
}
42504260
_ => Some(Action::Park(None)),
42514261
}
4252-
} else if replaying
4253-
|| self.controller.transaction_commit_requested()
4254-
|| bootstrapping
4255-
|| self.bootstrapping
4256-
{
4257-
// The `self.bootstrapping` condition above detects a transition
4258-
// from bootstrapping to normal operation and makes sure that the
4259-
// circuit performs an extra step in the normal mode in order to
4260-
// initialize output table snapshots of output relations that did
4261-
// not participate in bootstrapping.
4262+
} else if replaying || self.controller.transaction_commit_requested() || bootstrapping {
42624263
Some(Action::Step)
42634264
} else if timer_expired(next_checkpoint, now) && !checkpoint_requested {
42644265
Some(Action::Checkpoint)
@@ -4325,7 +4326,6 @@ impl StepTrigger {
43254326
}
43264327
};
43274328

4328-
self.bootstrapping = bootstrapping;
43294329
if result == Action::Step {
43304330
self.buffer_timeout = None;
43314331
}

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7616,7 +7616,6 @@ impl CircuitHandle {
76167616
});
76177617

76187618
all_complete && self.is_commit_complete()
7619-
76207619
}
76217620

76227621
/// Finalize the replay phase of the circuit.

0 commit comments

Comments
 (0)