Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions crates/dbsp/src/circuit/circuit_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,9 @@ pub trait Node: Any {
/// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates.
fn start_compaction(&mut self);

/// Call [`Operator::is_compaction_complete`](super::operator_traits::Operator::is_compaction_complete) on the operator this node encapsulates.
fn is_compaction_complete(&self) -> bool;

/// Place operator in the replay mode.
///
/// In the replay mode the operator streams its stored state to a temporary
Expand Down Expand Up @@ -1885,6 +1888,10 @@ pub trait CircuitBase: 'static {
fn rebalance(&self);

fn start_compaction(&self);

/// Returns `true` when all operators' background compaction has fully
/// converged.
fn is_compaction_complete(&self) -> bool;
}

/// The circuit interface. All DBSP computation takes place within a circuit.
Expand Down Expand Up @@ -3595,6 +3602,15 @@ where
Ok(())
});
}

fn is_compaction_complete(&self) -> bool {
let mut complete = true;
let _ = self.map_local_nodes(&mut |node| {
complete &= node.is_compaction_complete();
Ok(())
});
complete
}
}

impl<P, T> Circuit for ChildCircuit<P, T>
Expand Down Expand Up @@ -4775,6 +4791,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -4926,6 +4946,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5091,6 +5115,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5249,6 +5277,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5464,6 +5496,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5655,6 +5691,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5870,6 +5910,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6059,6 +6103,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6269,6 +6317,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6464,6 +6516,10 @@ where
self.operator.start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6649,6 +6705,10 @@ where
self.operator.borrow_mut().start_compaction()
}

fn is_compaction_complete(&self) -> bool {
self.operator.borrow().is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().clear_state()
}
Expand Down Expand Up @@ -6816,6 +6876,10 @@ where

fn start_compaction(&mut self) {}

fn is_compaction_complete(&self) -> bool {
true
}

fn clear_state(&mut self) -> Result<(), DbspError> {
Ok(())
}
Expand Down Expand Up @@ -7048,6 +7112,10 @@ where
self.circuit.start_compaction();
}

fn is_compaction_complete(&self) -> bool {
self.circuit.is_compaction_complete()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.circuit
.map_local_nodes_mut(&mut |node| node.clear_state())
Expand Down Expand Up @@ -7839,6 +7907,10 @@ impl CircuitHandle {
pub fn start_compaction(&self) {
self.circuit.start_compaction()
}

pub fn is_compaction_complete(&self) -> bool {
self.circuit.is_compaction_complete()
}
}

pin_project! {
Expand Down
135 changes: 135 additions & 0 deletions crates/dbsp/src/circuit/dbsp_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ impl Runtime {
return;
}
}
Ok(Command::IsCompactionComplete) => {
let complete = circuit.is_compaction_complete();
if status_sender
.send(Ok(Response::IsCompactionComplete(complete)))
.is_err()
{
return;
}
}
// Nothing to do: do some housekeeping and relinquish the CPU if there's none
// left.
Err(TryRecvError::Empty) => {
Expand Down Expand Up @@ -976,6 +985,7 @@ enum Command {
Rebalance,
SetAutoRebalance(bool),
StartCompaction,
IsCompactionComplete,
}

impl Debug for Command {
Expand Down Expand Up @@ -1018,6 +1028,7 @@ impl Debug for Command {
f.debug_tuple("SetAutoRebalance").field(enable).finish()
}
Command::StartCompaction => write!(f, "StartCompaction"),
Command::IsCompactionComplete => write!(f, "IsCompactionComplete"),
}
}
}
Expand All @@ -1027,6 +1038,7 @@ enum Response {
Unit,
CommitComplete(bool),
BootstrapComplete(bool),
IsCompactionComplete(bool),
CommitProgress(CommitProgress),
ProfileDump(Graph),
Profile(WorkerProfile),
Expand Down Expand Up @@ -1740,6 +1752,59 @@ impl DBSPHandle {
self.broadcast_command(Command::StartCompaction, |_, _| {})?;
Ok(())
}

/// Returns `true` when background compaction has fully converged on every
/// worker: all compaction requests have been processed, no merge is in
/// progress, and each spine has been reduced to at most one batch.
///
/// This is a non-blocking point-in-time snapshot. Callers that need to
/// wait for compaction to finish should call
/// [`wait_for_compaction`](Self::wait_for_compaction) or poll this method.
pub fn is_compaction_complete(&mut self) -> Result<bool, DbspError> {
let mut complete = true;
self.broadcast_command(Command::IsCompactionComplete, |_, response| {
if let Response::IsCompactionComplete(c) = response {
complete &= c;
}
})?;
Ok(complete)
}

/// Block until background compaction has fully converged on every worker,
/// or until `timeout` elapses.
///
/// Polls [`is_compaction_complete`](Self::is_compaction_complete) with
/// exponential back-off, starting at 1 ms and doubling each iteration up
/// to a cap of 1 s.
///
/// Returns `Ok(())` when compaction is complete, or an error if the
/// circuit fails or the timeout expires.
pub fn wait_for_compaction(
&mut self,
timeout: std::time::Duration,
) -> Result<(), anyhow::Error> {
use std::thread;
use std::time::Instant;

const INITIAL_SLEEP_MS: u64 = 1;
const MAX_SLEEP_MS: u64 = 1_000;

let deadline = Instant::now() + timeout;
let mut sleep_ms = INITIAL_SLEEP_MS;

loop {
if self.is_compaction_complete()? {
return Ok(());
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
anyhow::bail!("timed out after {timeout:?} waiting for compaction to complete");
}
let sleep = std::time::Duration::from_millis(sleep_ms).min(remaining);
thread::sleep(sleep);
sleep_ms = (sleep_ms * 2).min(MAX_SLEEP_MS);
}
}
}

impl Drop for DBSPHandle {
Expand Down Expand Up @@ -2651,4 +2716,74 @@ pub(crate) mod tests {
dbsp.kill().unwrap();
}
}

/// `is_compaction_complete` / `wait_for_compaction` must converge after a
/// compaction sweep and verify actual merging occurred.
///
/// Uses in-memory storage so the test finishes quickly.
#[test]
fn test_is_compaction_complete() {
use crate::circuit::GlobalNodeId;
use crate::circuit::metadata::{MetaItem, SPINE_BATCHES_COUNT};
use crate::utils::Tup2;
use std::time::Duration;

const BATCHES: usize = 30;
const RECORDS_PER_BATCH: i32 = 500;

let (mut dbsp, input_handle) =
Runtime::init_circuit(CircuitConfig::with_workers(2), |circuit| {
let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
stream.shard().integrate_trace();
Ok(handle)
})
.unwrap();

// Feed enough batches to give each spine more than one batch to merge.
for batch in 0..BATCHES as i32 {
let mut tuples: Vec<_> = (0..RECORDS_PER_BATCH)
.map(|r| Tup2(batch * RECORDS_PER_BATCH + r, Tup2(r, 1)))
.collect();
input_handle.append(&mut tuples);
dbsp.transaction().unwrap();
}

// Collect per-operator spine batch counts from the profile.
let batch_counts = |dbsp: &mut DBSPHandle| -> Vec<usize> {
let root = GlobalNodeId::root();
dbsp.retrieve_profile()
.unwrap()
.worker_profiles
.iter()
.flat_map(|p| p.attribute_profile(&SPINE_BATCHES_COUNT))
.filter_map(|(id, value)| {
(id != root).then(|| match value {
MetaItem::Count(n) => n,
other => panic!("unexpected MetaItem: {other:?}"),
})
})
.collect()
};

// Before compaction there must be at least one spine with more than one
// batch, confirming that actual merging work is needed.
let counts_before = batch_counts(&mut dbsp);
assert!(
counts_before.iter().any(|&n| n > 1),
"expected at least one spine with >1 batch before compaction, got {counts_before:?}"
);

// Request a full compaction and block until all spines converge.
dbsp.start_compaction().unwrap();
dbsp.wait_for_compaction(Duration::from_secs(60)).unwrap();

// After convergence every spine must have at most one batch.
let counts_after = batch_counts(&mut dbsp);
assert!(
counts_after.iter().all(|&n| n <= 1),
"expected all spines to have <=1 batch after compaction, got {counts_after:?}"
);

dbsp.kill().unwrap();
}
}
6 changes: 6 additions & 0 deletions crates/dbsp/src/circuit/operator_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ pub trait Operator: 'static {
///
/// Only defined for operators that support compaction. No-op for all other operators.
fn start_compaction(&mut self) {}

/// Returns `true` when the operator's background compaction has fully
/// converged. Operators without a trace always return `true`.
fn is_compaction_complete(&self) -> bool {
true
}
}

/// A source operator that injects data from the outside world or from the
Expand Down
6 changes: 6 additions & 0 deletions crates/dbsp/src/operator/dynamic/accumulate_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,12 @@ where
trace.initiate_compaction()
}
}

fn is_compaction_complete(&self) -> bool {
self.trace
.as_ref()
.is_none_or(|t| t.is_compaction_complete())
}
}

impl<C, B, T> StrictOperator<T> for AccumulateZ1Trace<C, B, T>
Expand Down
Loading
Loading