@@ -2428,6 +2428,10 @@ struct CircuitThread {
24282428 input_metadata : HashMap < String , Option < Resume > > ,
24292429
24302430 commit_updates : Option < CommitUpdates > ,
2431+
2432+ /// Set to true on startup if the circuit requires bootstrapping.
2433+ /// Cleared when the circuit completes bootstrapping.
2434+ bootstrapping : bool ,
24312435}
24322436
24332437struct CommitUpdates {
@@ -2694,9 +2698,9 @@ impl CircuitThread {
26942698 incarnation_uuid,
26952699 ) ?;
26962700
2697- controller
2698- . status
2699- . set_bootstrap_in_progress ( circuit . bootstrap_in_progress ( ) ) ;
2701+ let bootstrapping = circuit . bootstrap_in_progress ( ) ;
2702+
2703+ controller . status . set_bootstrap_in_progress ( bootstrapping ) ;
27002704
27012705 let input_metadata = input_metadata. map ( |input_metadata| {
27022706 input_metadata
@@ -2712,7 +2716,7 @@ impl CircuitThread {
27122716
27132717 // The pipeline hasn't changed based on input and output persistent id values,
27142718 // yet the circuit is bootstrapping. This is a bug.
2715- if can_replay && circuit . bootstrap_in_progress ( ) {
2719+ if can_replay && bootstrapping {
27162720 return Err ( ControllerError :: UnexpectedBootstrap {
27172721 bootstrap_info : circuit. bootstrap_info ( ) . clone ( ) ,
27182722 } ) ;
@@ -2733,10 +2737,10 @@ impl CircuitThread {
27332737 } ?;
27342738
27352739 // The above code ensures that replay and bootstrapping cannot happen at the same time.
2736- assert ! ( !( ft. is_replaying( ) && circuit . bootstrap_in_progress ( ) ) ) ;
2740+ assert ! ( !( ft. is_replaying( ) && bootstrapping ) ) ;
27372741
27382742 // Disable journaling while we're bootstrapping the circuit.
2739- if circuit . bootstrap_in_progress ( ) {
2743+ if bootstrapping {
27402744 ft. disable ( ) ;
27412745 }
27422746
@@ -2764,6 +2768,7 @@ impl CircuitThread {
27642768 checkpoint_sender,
27652769 input_metadata : input_metadata. unwrap_or_default ( ) ,
27662770 commit_updates : None ,
2771+ bootstrapping,
27672772 } )
27682773 }
27692774
@@ -2792,10 +2797,7 @@ impl CircuitThread {
27922797 // so that if the first step() we perform below before entering the loop
27932798 // ends up finishing bootstrapping, we will still perform an extra step to initialize
27942799 // the output table snapshots inside the loop.
2795- let mut trigger = StepTrigger :: new (
2796- self . controller . clone ( ) ,
2797- self . circuit . bootstrap_in_progress ( ) ,
2798- ) ;
2800+ let mut trigger = StepTrigger :: new ( self . controller . clone ( ) ) ;
27992801 if config. global . cpu_profiler {
28002802 self . circuit . enable_cpu_profiler ( ) . unwrap_or_else ( |e| {
28012803 error ! ( "Failed to enable CPU profiler: {e}" ) ;
@@ -2810,7 +2812,7 @@ impl CircuitThread {
28102812 //
28112813 // Skip this during bootstrap to avoid a slow first step. We don't guarantee
28122814 // that view snapshots are up-to-date until bootstrap is complete.
2813- if !self . circuit . bootstrap_in_progress ( )
2815+ if !self . bootstrapping
28142816 && let Err ( error) = self . step ( )
28152817 {
28162818 let _ = init_status_sender. send ( Err ( error) ) ;
@@ -2884,7 +2886,11 @@ impl CircuitThread {
28842886 self . last_checkpoint ( ) ,
28852887 self . last_checkpoint_sync ( ) ,
28862888 self . replaying ( ) ,
2887- self . circuit . bootstrap_in_progress ( ) ,
2889+ // `status.bootstrap_in_progress` is cleared one transaction after circuit bootstrapping is complete,
2890+ // which is required to initialize the output snapshots.
2891+ // We want the trigger to trigger that extra transaction; therefore we pass `status.bootstrap_in_progress`
2892+ // rather than `self.bootstrapping` here.
2893+ self . controller . status . bootstrap_in_progress ( ) ,
28882894 self . checkpoint_requested ( ) ,
28892895 self . sync_checkpoint_requested ( ) ,
28902896 coordination_request,
@@ -2957,11 +2963,6 @@ impl CircuitThread {
29572963 transaction_state. into_coordination_status ( ) ,
29582964 ) ) ;
29592965
2960- // If bootstrapping has completed, update the status flag.
2961- self . controller
2962- . status
2963- . set_bootstrap_in_progress ( self . circuit . bootstrap_in_progress ( ) ) ;
2964-
29652966 // Update `trace_snapshot` to the latest traces.
29662967 //
29672968 // We do this before updating `total_processed_records` so that ad hoc
@@ -2973,6 +2974,20 @@ impl CircuitThread {
29732974 . with_category ( "Step" )
29742975 . with_tooltip ( || format ! ( "update ad-hoc tables after step {}" , self . step) )
29752976 . in_scope ( || self . update_snapshot ( ) ) ;
2977+
2978+ let bootstrapping = self . circuit . bootstrap_in_progress ( ) ;
2979+
2980+ // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
2981+ // until the circuit performs an extra transaction to initialize output snapshots
2982+ // (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
2983+ // is true).
2984+ if self . bootstrapping && !bootstrapping {
2985+ self . bootstrapping = false ;
2986+ } else {
2987+ self . controller
2988+ . status
2989+ . set_bootstrap_in_progress ( bootstrapping) ;
2990+ }
29762991 }
29772992
29782993 // Record that we've processed the records, unless there is a transaction in progress,
@@ -4143,10 +4158,6 @@ struct StepTrigger {
41434158
41444159 /// Time between automatic checkpoint syncs.
41454160 sync_interval : Option < Duration > ,
4146-
4147- /// The circuit is bootstrapping. Used to detect the transition from bootstrapping
4148- /// to normal mode.
4149- bootstrapping : bool ,
41504161}
41514162
41524163/// Action for the controller to take.
@@ -4167,7 +4178,7 @@ enum Action {
41674178
41684179impl StepTrigger {
41694180 /// Returns a new [StepTrigger].
4170- fn new ( controller : Arc < ControllerInner > , bootstrapping : bool ) -> Self {
4181+ fn new ( controller : Arc < ControllerInner > ) -> Self {
41714182 let config = & controller. status . pipeline_config . global ;
41724183 let max_buffering_delay = Duration :: from_micros ( config. max_buffering_delay_usecs ) ;
41734184 let min_batch_size_records = config. min_batch_size_records ;
@@ -4187,7 +4198,6 @@ impl StepTrigger {
41874198 max_buffering_delay,
41884199 min_batch_size_records,
41894200 checkpoint_interval,
4190- bootstrapping,
41914201 sync_interval,
41924202 }
41934203 }
@@ -4249,16 +4259,7 @@ impl StepTrigger {
42494259 }
42504260 _ => Some ( Action :: Park ( None ) ) ,
42514261 }
4252- } else if replaying
4253- || self . controller . transaction_commit_requested ( )
4254- || bootstrapping
4255- || self . bootstrapping
4256- {
4257- // The `self.bootstrapping` condition above detects a transition
4258- // from bootstrapping to normal operation and makes sure that the
4259- // circuit performs an extra step in the normal mode in order to
4260- // initialize output table snapshots of output relations that did
4261- // not participate in bootstrapping.
4262+ } else if replaying || self . controller . transaction_commit_requested ( ) || bootstrapping {
42624263 Some ( Action :: Step )
42634264 } else if timer_expired ( next_checkpoint, now) && !checkpoint_requested {
42644265 Some ( Action :: Checkpoint )
@@ -4325,7 +4326,6 @@ impl StepTrigger {
43254326 }
43264327 } ;
43274328
4328- self . bootstrapping = bootstrapping;
43294329 if result == Action :: Step {
43304330 self . buffer_timeout = None ;
43314331 }
0 commit comments