@@ -2395,6 +2395,9 @@ struct CircuitThread {
23952395 ft : Option < FtState > ,
23962396 parker : Parker ,
23972397
2398+ /// Whether to suppress output connector records during bootstrapping.
2399+ silent_bootstrap : bool ,
2400+
23982401 checkpoint_delay_warning : Option < LongOperationWarning > ,
23992402 checkpoint_requests : Vec < CheckpointRequest > ,
24002403 running_checkpoint : Option < RunningCheckpoint > ,
@@ -2598,17 +2601,18 @@ impl CircuitThread {
25982601
25992602 if !diff. is_empty ( ) {
26002603 info ! ( "Pipeline changes detected: {diff}" ) ;
2601- if state. bootstrap_policy ( ) == BootstrapPolicy :: Reject {
2604+ let bootstrap_policy = state. bootstrap_config ( ) . bootstrap_policy ;
2605+ if bootstrap_policy == BootstrapPolicy :: Reject {
26022606 return Err ( ControllerError :: BootstrapRejectedByUser ) ;
2603- } else if state . bootstrap_policy ( ) == BootstrapPolicy :: AwaitApproval {
2607+ } else if bootstrap_policy == BootstrapPolicy :: AwaitApproval {
26042608 info ! ( "Awaiting user approval before bootstrapping modified pipeline." ) ;
26052609 state. set_phase ( PipelinePhase :: Initializing (
26062610 InitializationState :: AwaitingApproval ( Box :: new ( diff. clone ( ) ) ) ,
26072611 ) ) ;
26082612 }
26092613
26102614 loop {
2611- match state. bootstrap_policy ( ) {
2615+ match state. bootstrap_config ( ) . bootstrap_policy {
26122616 BootstrapPolicy :: Allow => {
26132617 info ! (
26142618 "User approved pipeline changes. Proceeding with initialization."
@@ -2758,6 +2762,9 @@ impl CircuitThread {
27582762 backpressure_thread,
27592763 storage,
27602764 parker,
2765+ silent_bootstrap : state
2766+ . map ( |state| state. bootstrap_config ( ) . silent_bootstrap )
2767+ . unwrap_or ( false ) ,
27612768 checkpoint_delay_warning : None ,
27622769 checkpoint_requests : Vec :: new ( ) ,
27632770 running_checkpoint : None ,
@@ -3715,16 +3722,36 @@ impl CircuitThread {
37153722 /// this step. If `processed_records` is `None`, we're in the middle of a
37163723 /// transaction and the records are not fully processed yet.
37173724 fn push_output ( & mut self , processed_records : Option < ProcessedRecords > ) {
3725+ let silent_bootstrap =
3726+ self . silent_bootstrap && self . controller . status . bootstrap_in_progress ( ) ;
3727+
37183728 let outputs = self . controller . outputs . read ( ) . unwrap ( ) ;
37193729 for ( _stream, ( output_handles, endpoints) ) in outputs. iter_by_stream ( ) {
3720- let delta_batch = output_handles. delta_handle . as_ref ( ) . concat ( ) ;
3721- let num_delta_records = delta_batch. len ( ) ;
3730+ let ( mut delta_batch, num_delta_records) = if silent_bootstrap {
3731+ let _ = output_handles. delta_handle . take_from_all ( ) ;
3732+ ( None , 0 )
3733+ } else {
3734+ let delta_batch = output_handles. delta_handle . as_ref ( ) . concat ( ) ;
3735+ let num_delta_records = delta_batch. len ( ) ;
37223736
3723- let mut delta_batch = Some ( delta_batch) ;
3737+ ( Some ( delta_batch) , num_delta_records)
3738+ } ;
37243739
37253740 for ( i, endpoint_id) in endpoints. iter ( ) . enumerate ( ) {
37263741 let endpoint = outputs. lookup_by_id ( endpoint_id) . unwrap ( ) ;
37273742
3743+ // Silent bootstrap: send empty batch for progress tracking only.
3744+ if silent_bootstrap {
3745+ self . controller . status . enqueue_batch ( * endpoint_id, 0 ) ;
3746+ endpoint. queue . push ( BatchQueueEntry {
3747+ step : self . step ,
3748+ data : None ,
3749+ processed_records,
3750+ } ) ;
3751+ endpoint. unparker . unpark ( ) ;
3752+ continue ;
3753+ }
3754+
37283755 if endpoint. created_during_transaction_number
37293756 == self . controller . get_transaction_number ( )
37303757 {
0 commit comments