Skip to content

Commit f79b40f

Browse files
committed
[adapters] Don't accumulate outputs without connectors.
Fixes #4967. If a table or view doesn't have at least one connector attached to it, don't accumulate and produce its output. This optimization is particularly significant during backfill with transactions, where the accumulator stores the entire table or view snapshot, which ends up getting discarded. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent fd08265 commit f79b40f

File tree

7 files changed

+448
-51
lines changed

7 files changed

+448
-51
lines changed

crates/adapterlib/src/catalog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::any::Any;
22
use std::collections::HashSet;
33
use std::fmt::{Debug, Formatter};
4+
use std::sync::atomic::AtomicUsize;
45
use std::sync::Arc;
56

67
use anyhow::Result as AnyResult;
@@ -640,4 +641,9 @@ pub struct OutputCollectionHandles {
640641

641642
/// A stream of changes to the collection.
642643
pub delta_handle: Box<dyn SerBatchReaderHandle>,
644+
645+
/// Reference to the enable count of the accumulator used to collect updates to this stream.
646+
/// Incremented every time an output connector is attached to this stream; decremented when
647+
/// the output connector is detached.
648+
pub enable_count: Arc<AtomicUsize>,
643649
}

crates/adapters/src/controller.rs

Lines changed: 87 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,6 +1895,7 @@ impl CircuitThread {
18951895
match self.controller.advance_transaction_state() {
18961896
Some(TransactionState::Started(transaction_id)) => {
18971897
info!("Starting transaction {transaction_id}");
1898+
self.controller.increment_transaction_number();
18981899
self.circuit.start_transaction().unwrap_or_else(|e| {
18991900
self.controller.error(Arc::new(e.into()), None);
19001901
self.controller
@@ -1950,7 +1951,7 @@ impl CircuitThread {
19501951
}
19511952
} else {
19521953
debug!("circuit thread: calling 'circuit.transaction'");
1953-
1954+
self.controller.increment_transaction_number();
19541955
// FIXME: we're using "span" for both step() (above) and transaction() (here).
19551956
SamplySpan::new(debug_span!("step"))
19561957
.in_scope(|| self.circuit.transaction())
@@ -2376,6 +2377,18 @@ impl CircuitThread {
23762377
for (i, endpoint_id) in endpoints.iter().enumerate() {
23772378
let endpoint = outputs.lookup_by_id(endpoint_id).unwrap();
23782379

2380+
if endpoint.created_during_transaction_number
2381+
== self.controller.get_transaction_number()
2382+
{
2383+
trace!(
2384+
"Output endpoint '{}' was created during the current transaction (seq. number {}) and will not receive any outputs until the next transaction.",
2385+
endpoint.endpoint_name, endpoint.created_during_transaction_number
2386+
);
2387+
// We need to propagate processed_records to the connector for progress tracking.
2388+
endpoint.queue.push((self.step, None, processed_records));
2389+
endpoint.unparker.unpark();
2390+
continue;
2391+
}
23792392
self.controller
23802393
.status
23812394
.enqueue_batch(*endpoint_id, num_delta_records);
@@ -2386,7 +2399,9 @@ impl CircuitThread {
23862399
delta_batch.as_ref().unwrap().clone()
23872400
};
23882401

2389-
endpoint.queue.push((self.step, batch, processed_records));
2402+
endpoint
2403+
.queue
2404+
.push((self.step, Some(batch), processed_records));
23902405

23912406
// Wake up the output thread. We're not trying to be smart here and
23922407
// wake up the thread conditionally if it was previously idle, as I
@@ -3395,7 +3410,7 @@ impl Drop for StatisticsThread {
33953410
/// that is equal to the number of input records fully processed by
33963411
/// DBSP before emitting this batch of outputs or `None` if the circuit is
33973412
/// executing a transaction. The label increases monotonically over time.
3398-
type BatchQueue = SegQueue<(Step, Arc<dyn SyncSerBatchReader>, Option<u64>)>;
3413+
type BatchQueue = SegQueue<(Step, Option<Arc<dyn SyncSerBatchReader>>, Option<u64>)>;
33993414

34003415
/// State tracked by the controller for each output endpoint.
34013416
struct OutputEndpointDescr {
@@ -3405,6 +3420,10 @@ struct OutputEndpointDescr {
34053420
/// Stream name that the endpoint is connected to.
34063421
stream_name: String,
34073422

3423+
/// Transaction number when the endpoint was created.
3424+
/// 0 - the endpoint was created before the first transaction performed by the controller.
3425+
created_during_transaction_number: u64,
3426+
34083427
/// FIFO queue of batches read from the stream.
34093428
queue: Arc<BatchQueue>,
34103429

@@ -3417,12 +3436,18 @@ struct OutputEndpointDescr {
34173436
}
34183437

34193438
impl OutputEndpointDescr {
3420-
pub fn new(endpoint_name: &str, stream_name: &str, unparker: Unparker) -> Self {
3439+
pub fn new(
3440+
endpoint_name: &str,
3441+
stream_name: &str,
3442+
created_during_transaction_number: u64,
3443+
unparker: Unparker,
3444+
) -> Self {
34213445
Self {
34223446
endpoint_name: endpoint_name.to_string(),
34233447
stream_name: canonical_identifier(stream_name),
34243448
queue: Arc::new(SegQueue::new()),
34253449
disconnect_flag: Arc::new(AtomicBool::new(false)),
3450+
created_during_transaction_number,
34263451
unparker,
34273452
}
34283453
}
@@ -3470,6 +3495,9 @@ impl OutputEndpoints {
34703495
handles: OutputCollectionHandles,
34713496
endpoint_descr: OutputEndpointDescr,
34723497
) {
3498+
// Enable the accumulator for this output stream.
3499+
// See `struct Accumulator::enable_count` for more details.
3500+
handles.enable_count.fetch_add(1, Ordering::Relaxed);
34733501
self.by_stream
34743502
.entry(endpoint_descr.stream_name.clone())
34753503
.or_insert_with(|| (handles, BTreeSet::new()))
@@ -3482,7 +3510,12 @@ impl OutputEndpoints {
34823510
self.by_id.remove(endpoint_id).inspect(|descr| {
34833511
self.by_stream
34843512
.get_mut(&descr.stream_name)
3485-
.map(|(_, endpoints)| endpoints.remove(endpoint_id));
3513+
.map(|(handles, endpoints)| {
3514+
// Disable the accumulator for this output stream.
3515+
let count = handles.enable_count.fetch_sub(1, Ordering::Relaxed);
3516+
assert!(count > 0);
3517+
endpoints.remove(endpoint_id)
3518+
});
34863519
})
34873520
}
34883521

@@ -3543,23 +3576,25 @@ impl OutputBuffer {
35433576
/// before this batch was produced or `None` if the circuit is executing a transaction.
35443577
fn insert(
35453578
&mut self,
3546-
batch: Arc<dyn SyncSerBatchReader>,
3579+
batch: Option<Arc<dyn SyncSerBatchReader>>,
35473580
step: Step,
35483581
processed_records: Option<u64>,
35493582
) {
3550-
if let Some(buffer) = &mut self.buffer {
3551-
for batch in batch.batches() {
3552-
buffer.insert(batch);
3553-
}
3554-
} else {
3555-
for batch in batch.batches() {
3556-
if let Some(buffer) = self.buffer.as_mut() {
3583+
if let Some(batch) = batch {
3584+
if let Some(buffer) = &mut self.buffer {
3585+
for batch in batch.batches() {
35573586
buffer.insert(batch);
3558-
} else {
3559-
self.buffer = Some(batch.into_trace());
3560-
};
3587+
}
3588+
} else {
3589+
for batch in batch.batches() {
3590+
if let Some(buffer) = self.buffer.as_mut() {
3591+
buffer.insert(batch);
3592+
} else {
3593+
self.buffer = Some(batch.into_trace());
3594+
};
3595+
}
3596+
self.buffer_since = Instant::now();
35613597
}
3562-
self.buffer_since = Instant::now();
35633598
}
35643599
self.buffered_step = step;
35653600
if let Some(records) = processed_records {
@@ -3669,6 +3704,15 @@ pub struct ControllerInner {
36693704
// from the sync context by the circuit thread.
36703705
transaction_info: Mutex<TransactionInfo>,
36713706

3707+
/// Current transaction number.
3708+
///
3709+
/// This is not the same as transaction ID. We increment this counter
3710+
/// on each call to `circuit.transaction()` when not running a user-initiated transaction
3711+
/// or on each start_transaction() call when running a user-initiated transaction.
3712+
///
3713+
/// The counter is 0 before the first transaction, 1 during the first transaction, etc.
3714+
transaction_number: AtomicU64,
3715+
36723716
/// Is the circuit thread still restoring from a checkpoint (this includes the journal replay phase)?
36733717
restoring: AtomicBool,
36743718
}
@@ -3720,6 +3764,7 @@ impl ControllerInner {
37203764
fault_tolerance: config.global.fault_tolerance.model,
37213765
transaction_info: Mutex::new(TransactionInfo::new()),
37223766
restoring: AtomicBool::new(config.global.fault_tolerance.is_enabled()),
3767+
transaction_number: AtomicU64::new(0),
37233768
});
37243769
controller.initialize_adhoc_queries();
37253770

@@ -3795,6 +3840,14 @@ impl ControllerInner {
37953840
))
37963841
}
37973842

3843+
fn get_transaction_number(&self) -> u64 {
3844+
self.transaction_number.load(Ordering::Acquire)
3845+
}
3846+
3847+
fn increment_transaction_number(&self) {
3848+
self.transaction_number.fetch_add(1, Ordering::AcqRel);
3849+
}
3850+
37983851
fn input_endpoint_id_by_name(
37993852
&self,
38003853
endpoint_name: &str,
@@ -4259,8 +4312,12 @@ impl ControllerInner {
42594312
};
42604313

42614314
let parker = Parker::new();
4262-
let endpoint_descr =
4263-
OutputEndpointDescr::new(endpoint_name, &stream_name, parker.unparker().clone());
4315+
let endpoint_descr = OutputEndpointDescr::new(
4316+
endpoint_name,
4317+
&stream_name,
4318+
self.get_transaction_number(),
4319+
parker.unparker().clone(),
4320+
);
42644321
let queue = endpoint_descr.queue.clone();
42654322
let disconnect_flag = endpoint_descr.disconnect_flag.clone();
42664323
let controller = self.clone();
@@ -4367,7 +4424,7 @@ impl ControllerInner {
43674424
// buffer; we will check if the buffer needs to be flushed at the next iteration of
43684425
// the loop. If buffering is disabled, push the buffer directly to the encoder.
43694426

4370-
let num_records = data.len();
4427+
let num_records = data.as_ref().map_or(0, |b| b.len());
43714428

43724429
// trace!("Pushing {num_records} records to output endpoint {endpoint_name}");
43734430

@@ -4389,14 +4446,16 @@ impl ControllerInner {
43894446
);
43904447
}
43914448
} else {
4392-
Self::push_batch_to_encoder(
4393-
data.as_ref(),
4394-
endpoint_id,
4395-
&endpoint_name,
4396-
encoder.as_mut(),
4397-
step,
4398-
&controller,
4399-
);
4449+
if let Some(data) = data {
4450+
Self::push_batch_to_encoder(
4451+
data.as_ref(),
4452+
endpoint_id,
4453+
&endpoint_name,
4454+
encoder.as_mut(),
4455+
step,
4456+
&controller,
4457+
);
4458+
}
44004459

44014460
// `num_records` output records have been transmitted --
44024461
// update output stats, wake up the circuit thread if the

crates/adapters/src/static_compile/catalog.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,8 @@ impl Catalog {
380380
}
381381

382382
// Create handle for the stream itself.
383-
let (delta_handle, delta_gid) = stream.accumulate_output_persistent_with_gid(persistent_id);
383+
let (delta_handle, enable_count, delta_gid) =
384+
stream.accumulate_output_persistent_with_gid(persistent_id);
384385
stream.circuit().set_mir_node_id(&delta_gid, persistent_id);
385386

386387
let handles = OutputCollectionHandles {
@@ -389,6 +390,7 @@ impl Catalog {
389390
index_of: None,
390391
delta_handle: Box::new(<SerCollectionHandleImpl<_, D, ()>>::new(delta_handle))
391392
as Box<dyn SerBatchReaderHandle>,
393+
enable_count,
392394
integrate_handle_is_indexed: false,
393395
integrate_handle: None,
394396
};
@@ -445,7 +447,8 @@ impl Catalog {
445447
let stream = stream.shard();
446448

447449
// Create handle for the stream itself.
448-
let (delta_handle, delta_gid) = stream.accumulate_output_persistent_with_gid(persistent_id);
450+
let (delta_handle, enable_count, delta_gid) =
451+
stream.accumulate_output_persistent_with_gid(persistent_id);
449452
stream.circuit().set_mir_node_id(&delta_gid, persistent_id);
450453

451454
let (integrate_handle, integrate_gid) = stream
@@ -468,6 +471,7 @@ impl Catalog {
468471
)) as Arc<dyn SerBatchReaderHandle>),
469472
delta_handle: Box::new(<SerCollectionHandleImpl<_, D, ()>>::new(delta_handle))
470473
as Box<dyn SerBatchReaderHandle>,
474+
enable_count,
471475
};
472476

473477
self.register_output_batch_handles(&name, handles).unwrap();
@@ -598,7 +602,8 @@ impl Catalog {
598602
.as_deref(),
599603
);
600604

601-
let (delta_handle, delta_gid) = delta.accumulate_output_persistent_with_gid(persistent_id);
605+
let (delta_handle, enable_count, delta_gid) =
606+
delta.accumulate_output_persistent_with_gid(persistent_id);
602607
stream.circuit().set_mir_node_id(&delta_gid, persistent_id);
603608

604609
let integrate_handle = if materialized {
@@ -628,6 +633,7 @@ impl Catalog {
628633
index_of: None,
629634
delta_handle: Box::new(<SerCollectionHandleImpl<_, VD, ()>>::new(delta_handle))
630635
as Box<dyn SerBatchReaderHandle>,
636+
enable_count,
631637
integrate_handle_is_indexed: true,
632638
integrate_handle,
633639
};
@@ -696,7 +702,7 @@ impl Catalog {
696702

697703
let view_handles = self.output_handles(view_name)?;
698704

699-
let (stream_handle, stream_gid) =
705+
let (stream_handle, enable_count, stream_gid) =
700706
stream.accumulate_output_persistent_with_gid(persistent_id);
701707
stream.circuit().set_mir_node_id(&stream_gid, persistent_id);
702708

@@ -710,6 +716,7 @@ impl Catalog {
710716
index_of: Some(view_name.clone()),
711717
delta_handle: Box::new(<SerCollectionHandleImpl<_, KD, VD>>::new(stream_handle))
712718
as Box<dyn SerBatchReaderHandle>,
719+
enable_count,
713720
integrate_handle_is_indexed: false,
714721
integrate_handle: None,
715722
};
@@ -742,7 +749,7 @@ fn index_schema(
742749

743750
#[cfg(test)]
744751
mod test {
745-
use std::{io::Write, ops::Deref};
752+
use std::{io::Write, ops::Deref, sync::atomic::Ordering};
746753

747754
use crate::{catalog::RecordFormat, test::TestStruct, Catalog, CircuitCatalog};
748755
use dbsp::Runtime;
@@ -793,6 +800,9 @@ mod test {
793800
.unwrap();
794801

795802
let output_stream_handles = catalog.output_handles(&("Input_map".into())).unwrap();
803+
output_stream_handles
804+
.enable_count
805+
.fetch_add(1, Ordering::AcqRel);
796806

797807
// Step 1: insert a couple of values.
798808

crates/dbsp/src/operator/accumulator.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{borrow::Cow, marker::PhantomData, panic::Location};
1+
use std::{
2+
borrow::Cow,
3+
marker::PhantomData,
4+
panic::Location,
5+
sync::{atomic::AtomicUsize, Arc},
6+
};
27

38
use crate::{
49
circuit::{
@@ -35,6 +40,18 @@ where
3540
unsafe { result.transmute_payload() }
3641
}
3742

43+
/// Like [`Self::accumulate`], but also returns a reference to the enable count of the accumulator.
44+
///
45+
/// Used to instantiate accumulators for output connectors. See `Accumulator::enable_count` documentation.
46+
#[track_caller]
47+
pub fn accumulate_with_enable_count(&self) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>) {
48+
let factories = BatchReaderFactories::new::<B::Key, B::Val, B::R>();
49+
50+
let (result, enable_count) = self.inner().dyn_accumulate_with_enable_count(&factories);
51+
52+
(unsafe { result.transmute_payload() }, enable_count)
53+
}
54+
3855
#[track_caller]
3956
pub fn accumulate_apply2<B2, F, T>(
4057
&self,

0 commit comments

Comments
 (0)