@@ -2411,6 +2411,10 @@ struct CircuitThread {
24112411 input_metadata : HashMap < String , Option < Resume > > ,
24122412
24132413 commit_updates : Option < CommitUpdates > ,
2414+
2415+ /// Set to true on startup if the circuit requires bootstrapping.
2416+ /// Cleared when the circuit completes bootstrapping.
2417+ bootstrapping : bool ,
24142418}
24152419
24162420struct CommitUpdates {
@@ -2677,9 +2681,9 @@ impl CircuitThread {
26772681 incarnation_uuid,
26782682 ) ?;
26792683
2680- controller
2681- . status
2682- . set_bootstrap_in_progress ( circuit . bootstrap_in_progress ( ) ) ;
2684+ let bootstrapping = circuit . bootstrap_in_progress ( ) ;
2685+
2686+ controller . status . set_bootstrap_in_progress ( bootstrapping ) ;
26832687
26842688 let input_metadata = input_metadata. map ( |input_metadata| {
26852689 input_metadata
@@ -2695,7 +2699,7 @@ impl CircuitThread {
26952699
26962700 // The pipeline hasn't changed based on input and output persistent id values,
26972701 // yet the circuit is bootstrapping. This is a bug.
2698- if can_replay && circuit . bootstrap_in_progress ( ) {
2702+ if can_replay && bootstrapping {
26992703 return Err ( ControllerError :: UnexpectedBootstrap {
27002704 bootstrap_info : circuit. bootstrap_info ( ) . clone ( ) ,
27012705 } ) ;
@@ -2716,10 +2720,10 @@ impl CircuitThread {
27162720 } ?;
27172721
27182722 // The above code ensures that replay and bootstrapping cannot happen at the same time.
2719- assert ! ( !( ft. is_replaying( ) && circuit . bootstrap_in_progress ( ) ) ) ;
2723+ assert ! ( !( ft. is_replaying( ) && bootstrapping ) ) ;
27202724
27212725 // Disable journaling while we're bootstrapping the circuit.
2722- if circuit . bootstrap_in_progress ( ) {
2726+ if bootstrapping {
27232727 ft. disable ( ) ;
27242728 }
27252729
@@ -2747,6 +2751,7 @@ impl CircuitThread {
27472751 checkpoint_sender,
27482752 input_metadata : input_metadata. unwrap_or_default ( ) ,
27492753 commit_updates : None ,
2754+ bootstrapping,
27502755 } )
27512756 }
27522757
@@ -2775,10 +2780,7 @@ impl CircuitThread {
27752780 // so that if the first step() we perform below before entering the loop
27762781 // ends up finishing bootstrapping, we will still perform an extra step to initialize
27772782 // the output table snapshots inside the loop.
2778- let mut trigger = StepTrigger :: new (
2779- self . controller . clone ( ) ,
2780- self . circuit . bootstrap_in_progress ( ) ,
2781- ) ;
2783+ let mut trigger = StepTrigger :: new ( self . controller . clone ( ) ) ;
27822784 if config. global . cpu_profiler {
27832785 self . circuit . enable_cpu_profiler ( ) . unwrap_or_else ( |e| {
27842786 error ! ( "Failed to enable CPU profiler: {e}" ) ;
@@ -2793,7 +2795,7 @@ impl CircuitThread {
27932795 //
27942796 // Skip this during bootstrap to avoid a slow first step. We don't guarantee
27952797 // that view snapshots are up-to-date until bootstrap is complete.
2796- if !self . circuit . bootstrap_in_progress ( )
2798+ if !self . bootstrapping
27972799 && let Err ( error) = self . step ( )
27982800 {
27992801 let _ = init_status_sender. send ( Err ( error) ) ;
@@ -2867,7 +2869,7 @@ impl CircuitThread {
28672869 self . last_checkpoint ( ) ,
28682870 self . last_checkpoint_sync ( ) ,
28692871 self . replaying ( ) ,
2870- self . circuit . bootstrap_in_progress ( ) ,
2872+ self . controller . status . bootstrap_in_progress ( ) ,
28712873 self . checkpoint_requested ( ) ,
28722874 self . sync_checkpoint_requested ( ) ,
28732875 coordination_request,
@@ -2940,11 +2942,6 @@ impl CircuitThread {
29402942 transaction_state. into_coordination_status ( ) ,
29412943 ) ) ;
29422944
2943- // If bootstrapping has completed, update the status flag.
2944- self . controller
2945- . status
2946- . set_bootstrap_in_progress ( self . circuit . bootstrap_in_progress ( ) ) ;
2947-
29482945 // Update `trace_snapshot` to the latest traces.
29492946 //
29502947 // We do this before updating `total_processed_records` so that ad hoc
@@ -2956,6 +2953,20 @@ impl CircuitThread {
29562953 . with_category ( "Step" )
29572954 . with_tooltip ( || format ! ( "update ad-hoc tables after step {}" , self . step) )
29582955 . in_scope ( || self . update_snapshot ( ) ) ;
2956+
2957+ let bootstrapping = self . circuit . bootstrap_in_progress ( ) ;
2958+
2959+ // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag
2960+ // until the circuit performs an extra transaction to initialize output snapshots
2961+ // (`StepTrigger::trigger` makes sure to force a step as long as `controller.status.bootstrap_in_progress()`
2962+ // is true).
2963+ if self . bootstrapping && !bootstrapping {
2964+ self . bootstrapping = false ;
2965+ } else {
2966+ self . controller
2967+ . status
2968+ . set_bootstrap_in_progress ( self . circuit . bootstrap_in_progress ( ) ) ;
2969+ }
29592970 }
29602971
29612972 // Record that we've processed the records, unless there is a transaction in progress,
@@ -4115,10 +4126,6 @@ struct StepTrigger {
41154126
41164127 /// Time between automatic checkpoint syncs.
41174128 sync_interval : Option < Duration > ,
4118-
4119- /// The circuit is bootstrapping. Used to detect the transition from bootstrapping
4120- /// to normal mode.
4121- bootstrapping : bool ,
41224129}
41234130
41244131/// Action for the controller to take.
@@ -4139,7 +4146,7 @@ enum Action {
41394146
41404147impl StepTrigger {
41414148 /// Returns a new [StepTrigger].
4142- fn new ( controller : Arc < ControllerInner > , bootstrapping : bool ) -> Self {
4149+ fn new ( controller : Arc < ControllerInner > ) -> Self {
41434150 let config = & controller. status . pipeline_config . global ;
41444151 let max_buffering_delay = Duration :: from_micros ( config. max_buffering_delay_usecs ) ;
41454152 let min_batch_size_records = config. min_batch_size_records ;
@@ -4159,7 +4166,6 @@ impl StepTrigger {
41594166 max_buffering_delay,
41604167 min_batch_size_records,
41614168 checkpoint_interval,
4162- bootstrapping,
41634169 sync_interval,
41644170 }
41654171 }
@@ -4221,16 +4227,7 @@ impl StepTrigger {
42214227 }
42224228 _ => Some ( Action :: Park ( None ) ) ,
42234229 }
4224- } else if replaying
4225- || self . controller . transaction_commit_requested ( )
4226- || bootstrapping
4227- || self . bootstrapping
4228- {
4229- // The `self.bootstrapping` condition above detects a transition
4230- // from bootstrapping to normal operation and makes sure that the
4231- // circuit performs an extra step in the normal mode in order to
4232- // initialize output table snapshots of output relations that did
4233- // not participate in bootstrapping.
4230+ } else if replaying || self . controller . transaction_commit_requested ( ) || bootstrapping {
42344231 Some ( Action :: Step )
42354232 } else if timer_expired ( next_checkpoint, now) && !checkpoint_requested {
42364233 Some ( Action :: Checkpoint )
@@ -4297,7 +4294,6 @@ impl StepTrigger {
42974294 }
42984295 } ;
42994296
4300- self . bootstrapping = bootstrapping;
43014297 if result == Action :: Step {
43024298 self . buffer_timeout = None ;
43034299 }
0 commit comments