@@ -1895,6 +1895,7 @@ impl CircuitThread {
18951895 match self . controller . advance_transaction_state ( ) {
18961896 Some ( TransactionState :: Started ( transaction_id) ) => {
18971897 info ! ( "Starting transaction {transaction_id}" ) ;
1898+ self . controller . increment_transaction_number ( ) ;
18981899 self . circuit . start_transaction ( ) . unwrap_or_else ( |e| {
18991900 self . controller . error ( Arc :: new ( e. into ( ) ) , None ) ;
19001901 self . controller
@@ -1950,7 +1951,7 @@ impl CircuitThread {
19501951 }
19511952 } else {
19521953 debug ! ( "circuit thread: calling 'circuit.transaction'" ) ;
1953-
1954+ self . controller . increment_transaction_number ( ) ;
19541955 // FIXME: we're using "span" for both step() (above) and transaction() (here).
19551956 SamplySpan :: new ( debug_span ! ( "step" ) )
19561957 . in_scope ( || self . circuit . transaction ( ) )
@@ -2376,6 +2377,18 @@ impl CircuitThread {
23762377 for ( i, endpoint_id) in endpoints. iter ( ) . enumerate ( ) {
23772378 let endpoint = outputs. lookup_by_id ( endpoint_id) . unwrap ( ) ;
23782379
2380+ if endpoint. created_during_transaction_number
2381+ == self . controller . get_transaction_number ( )
2382+ {
2383+ trace ! (
2384+ "Output endpoint '{}' was created during the current transaction (seq. number {}) and will not receive any outputs until the next transaction." ,
2385+ endpoint. endpoint_name, endpoint. created_during_transaction_number
2386+ ) ;
2387+ // We need to propagate processed_records to the connector for progress tracking.
2388+ endpoint. queue . push ( ( self . step , None , processed_records) ) ;
2389+ endpoint. unparker . unpark ( ) ;
2390+ continue ;
2391+ }
23792392 self . controller
23802393 . status
23812394 . enqueue_batch ( * endpoint_id, num_delta_records) ;
@@ -2386,7 +2399,9 @@ impl CircuitThread {
23862399 delta_batch. as_ref ( ) . unwrap ( ) . clone ( )
23872400 } ;
23882401
2389- endpoint. queue . push ( ( self . step , batch, processed_records) ) ;
2402+ endpoint
2403+ . queue
2404+ . push ( ( self . step , Some ( batch) , processed_records) ) ;
23902405
23912406 // Wake up the output thread. We're not trying to be smart here and
23922407 // wake up the thread conditionally if it was previously idle, as I
@@ -3395,7 +3410,7 @@ impl Drop for StatisticsThread {
33953410/// that is equal to the number of input records fully processed by
33963411/// DBSP before emitting this batch of outputs or `None` if the circuit is
33973412/// executing a transaction. The label increases monotonically over time.
3398- type BatchQueue = SegQueue < ( Step , Arc < dyn SyncSerBatchReader > , Option < u64 > ) > ;
3413+ type BatchQueue = SegQueue < ( Step , Option < Arc < dyn SyncSerBatchReader > > , Option < u64 > ) > ;
33993414
34003415/// State tracked by the controller for each output endpoint.
34013416struct OutputEndpointDescr {
@@ -3405,6 +3420,10 @@ struct OutputEndpointDescr {
34053420 /// Stream name that the endpoint is connected to.
34063421 stream_name : String ,
34073422
3423+ /// Transaction number when the endpoint was created.
3424+ /// 0 - the endpoint was created before the first transaction performed by the controller.
3425+ created_during_transaction_number : u64 ,
3426+
34083427 /// FIFO queue of batches read from the stream.
34093428 queue : Arc < BatchQueue > ,
34103429
@@ -3417,12 +3436,18 @@ struct OutputEndpointDescr {
34173436}
34183437
34193438impl OutputEndpointDescr {
3420- pub fn new ( endpoint_name : & str , stream_name : & str , unparker : Unparker ) -> Self {
3439+ pub fn new (
3440+ endpoint_name : & str ,
3441+ stream_name : & str ,
3442+ created_during_transaction_number : u64 ,
3443+ unparker : Unparker ,
3444+ ) -> Self {
34213445 Self {
34223446 endpoint_name : endpoint_name. to_string ( ) ,
34233447 stream_name : canonical_identifier ( stream_name) ,
34243448 queue : Arc :: new ( SegQueue :: new ( ) ) ,
34253449 disconnect_flag : Arc :: new ( AtomicBool :: new ( false ) ) ,
3450+ created_during_transaction_number,
34263451 unparker,
34273452 }
34283453 }
@@ -3470,6 +3495,9 @@ impl OutputEndpoints {
34703495 handles : OutputCollectionHandles ,
34713496 endpoint_descr : OutputEndpointDescr ,
34723497 ) {
3498+ // Enable the accumulator for this output stream.
3499+ // See `struct Accumulator::enable_count` for more details.
3500+ handles. enable_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
34733501 self . by_stream
34743502 . entry ( endpoint_descr. stream_name . clone ( ) )
34753503 . or_insert_with ( || ( handles, BTreeSet :: new ( ) ) )
@@ -3482,7 +3510,12 @@ impl OutputEndpoints {
34823510 self . by_id . remove ( endpoint_id) . inspect ( |descr| {
34833511 self . by_stream
34843512 . get_mut ( & descr. stream_name )
3485- . map ( |( _, endpoints) | endpoints. remove ( endpoint_id) ) ;
3513+ . map ( |( handles, endpoints) | {
3514+ // Disable the accumulator for this output stream.
3515+ let count = handles. enable_count . fetch_sub ( 1 , Ordering :: Relaxed ) ;
3516+ assert ! ( count > 0 ) ;
3517+ endpoints. remove ( endpoint_id)
3518+ } ) ;
34863519 } )
34873520 }
34883521
@@ -3543,23 +3576,25 @@ impl OutputBuffer {
35433576 /// before this batch was produced or `None` if the circuit is executing a transaction.
35443577 fn insert (
35453578 & mut self ,
3546- batch : Arc < dyn SyncSerBatchReader > ,
3579+ batch : Option < Arc < dyn SyncSerBatchReader > > ,
35473580 step : Step ,
35483581 processed_records : Option < u64 > ,
35493582 ) {
3550- if let Some ( buffer) = & mut self . buffer {
3551- for batch in batch. batches ( ) {
3552- buffer. insert ( batch) ;
3553- }
3554- } else {
3555- for batch in batch. batches ( ) {
3556- if let Some ( buffer) = self . buffer . as_mut ( ) {
3583+ if let Some ( batch) = batch {
3584+ if let Some ( buffer) = & mut self . buffer {
3585+ for batch in batch. batches ( ) {
35573586 buffer. insert ( batch) ;
3558- } else {
3559- self . buffer = Some ( batch. into_trace ( ) ) ;
3560- } ;
3587+ }
3588+ } else {
3589+ for batch in batch. batches ( ) {
3590+ if let Some ( buffer) = self . buffer . as_mut ( ) {
3591+ buffer. insert ( batch) ;
3592+ } else {
3593+ self . buffer = Some ( batch. into_trace ( ) ) ;
3594+ } ;
3595+ }
3596+ self . buffer_since = Instant :: now ( ) ;
35613597 }
3562- self . buffer_since = Instant :: now ( ) ;
35633598 }
35643599 self . buffered_step = step;
35653600 if let Some ( records) = processed_records {
@@ -3669,6 +3704,15 @@ pub struct ControllerInner {
36693704 // from the sync context by the circuit thread.
36703705 transaction_info : Mutex < TransactionInfo > ,
36713706
3707+ /// Current transaction number.
3708+ ///
3709+ /// This is not the same as transaction ID. We increment this counter
3710+ /// on each call to `circuit.transaction()` when not running a user-initiated transaction
3711+ /// or on each start_transaction() call when running a user-initiated transaction.
3712+ ///
3713+ /// The counter is 0 before the first transaction, 1 during the first transaction, etc.
3714+ transaction_number : AtomicU64 ,
3715+
36723716 /// Is the circuit thread still restoring from a checkpoint (this includes the journal replay phase)?
36733717 restoring : AtomicBool ,
36743718}
@@ -3720,6 +3764,7 @@ impl ControllerInner {
37203764 fault_tolerance : config. global . fault_tolerance . model ,
37213765 transaction_info : Mutex :: new ( TransactionInfo :: new ( ) ) ,
37223766 restoring : AtomicBool :: new ( config. global . fault_tolerance . is_enabled ( ) ) ,
3767+ transaction_number : AtomicU64 :: new ( 0 ) ,
37233768 } ) ;
37243769 controller. initialize_adhoc_queries ( ) ;
37253770
@@ -3795,6 +3840,14 @@ impl ControllerInner {
37953840 ) )
37963841 }
37973842
3843+ fn get_transaction_number ( & self ) -> u64 {
3844+ self . transaction_number . load ( Ordering :: Acquire )
3845+ }
3846+
3847+ fn increment_transaction_number ( & self ) {
3848+ self . transaction_number . fetch_add ( 1 , Ordering :: AcqRel ) ;
3849+ }
3850+
37983851 fn input_endpoint_id_by_name (
37993852 & self ,
38003853 endpoint_name : & str ,
@@ -4259,8 +4312,12 @@ impl ControllerInner {
42594312 } ;
42604313
42614314 let parker = Parker :: new ( ) ;
4262- let endpoint_descr =
4263- OutputEndpointDescr :: new ( endpoint_name, & stream_name, parker. unparker ( ) . clone ( ) ) ;
4315+ let endpoint_descr = OutputEndpointDescr :: new (
4316+ endpoint_name,
4317+ & stream_name,
4318+ self . get_transaction_number ( ) ,
4319+ parker. unparker ( ) . clone ( ) ,
4320+ ) ;
42644321 let queue = endpoint_descr. queue . clone ( ) ;
42654322 let disconnect_flag = endpoint_descr. disconnect_flag . clone ( ) ;
42664323 let controller = self . clone ( ) ;
@@ -4367,7 +4424,7 @@ impl ControllerInner {
43674424 // buffer; we will check if the buffer needs to be flushed at the next iteration of
43684425 // the loop. If buffering is disabled, push the buffer directly to the encoder.
43694426
4370- let num_records = data. len ( ) ;
4427+ let num_records = data. as_ref ( ) . map_or ( 0 , |b| b . len ( ) ) ;
43714428
43724429 // trace!("Pushing {num_records} records to output endpoint {endpoint_name}");
43734430
@@ -4389,14 +4446,16 @@ impl ControllerInner {
43894446 ) ;
43904447 }
43914448 } else {
4392- Self :: push_batch_to_encoder (
4393- data. as_ref ( ) ,
4394- endpoint_id,
4395- & endpoint_name,
4396- encoder. as_mut ( ) ,
4397- step,
4398- & controller,
4399- ) ;
4449+ if let Some ( data) = data {
4450+ Self :: push_batch_to_encoder (
4451+ data. as_ref ( ) ,
4452+ endpoint_id,
4453+ & endpoint_name,
4454+ encoder. as_mut ( ) ,
4455+ step,
4456+ & controller,
4457+ ) ;
4458+ }
44004459
44014460 // `num_records` output records have been transmitted --
44024461 // update output stats, wake up the circuit thread if the
0 commit comments