Skip to content

Commit 2d0f320

Browse files
committed
[dbsp] Compaction API.
Add a `/start_compaction` endpoint, which triggers background compaction of all traces in the circuit. Compaction starts by merging all batches at level 1 and pushing the resulting batch to level 2, regardless of its size, we then do the same at level 2, etc. Compaction does not stall the circuit. Compaction is useful for example to consolidate all negative weights in the pipeline. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent c430039 commit 2d0f320

20 files changed

Lines changed: 721 additions & 8 deletions

File tree

crates/adapters/src/controller.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,8 @@ pub type SyncCheckpointCallbackFn = Box<dyn FnOnce(Result<(), Arc<ControllerErro
470470
/// Rebalance callback argument to [`Controller::rebalance`].
471471
pub type RebalanceCallbackFn = Box<dyn FnOnce(Result<(), ControllerError>) + Send>;
472472

473+
pub type StartCompactionCallbackFn = Box<dyn FnOnce(Result<(), ControllerError>) + Send>;
474+
473475
/// A command that [Controller] can send to [Controller::circuit_thread].
474476
///
475477
/// There is no type for a command reply. Instead, the command implementation
@@ -481,6 +483,7 @@ enum Command {
481483
Suspend(SuspendCallbackFn),
482484
SyncCheckpoint((uuid::Uuid, SyncCheckpointCallbackFn)),
483485
Rebalance(RebalanceCallbackFn),
486+
StartCompaction(StartCompactionCallbackFn),
484487
}
485488

486489
impl Command {
@@ -496,6 +499,7 @@ impl Command {
496499
callback(Err(Arc::new(ControllerError::ControllerExit)))
497500
}
498501
Command::Rebalance(callback) => callback(Err(ControllerError::ControllerExit)),
502+
Command::StartCompaction(callback) => callback(Err(ControllerError::ControllerExit)),
499503
}
500504
}
501505
}
@@ -1989,6 +1993,19 @@ impl Controller {
19891993
Ok(())
19901994
}
19911995

1996+
pub async fn start_compaction(&self) -> Result<(), ControllerError> {
1997+
let (sender, receiver) = oneshot::channel();
1998+
self.inner
1999+
.send_command(Command::StartCompaction(Box::new(move |result| {
2000+
if sender.send(result).is_err() {
2001+
error!("`/start_compaction` result could not be sent");
2002+
}
2003+
})));
2004+
self.inner.circuit_thread_unparker.unpark();
2005+
receiver.await.unwrap()?;
2006+
Ok(())
2007+
}
2008+
19922009
/// Returns an object for monitoring the step that the controller has
19932010
/// completed.
19942011
pub fn step_watcher(&self) -> tokio::sync::watch::Receiver<StepStatus> {
@@ -3358,6 +3375,11 @@ impl CircuitThread {
33583375
.rebalance()
33593376
.map_err(ControllerError::dbsp_error),
33603377
),
3378+
Command::StartCompaction(reply_callback) => reply_callback(
3379+
self.circuit
3380+
.start_compaction()
3381+
.map_err(ControllerError::dbsp_error),
3382+
),
33613383
}
33623384
}
33633385
}

crates/adapters/src/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,6 +1238,7 @@ where
12381238
.service(reset_output_endpoint)
12391239
.service(reset_status)
12401240
.service(rebalance)
1241+
.service(start_compaction)
12411242
.service(coordination_activate_handler)
12421243
.service(coordination_status)
12431244
.service(coordination_step_request)
@@ -2485,6 +2486,12 @@ async fn rebalance(state: WebData<ServerState>) -> Result<HttpResponse, Pipeline
24852486
Ok(HttpResponse::Ok().into())
24862487
}
24872488

2489+
#[post("/start_compaction")]
2490+
async fn start_compaction(state: WebData<ServerState>) -> Result<HttpResponse, PipelineError> {
2491+
state.controller()?.start_compaction().await?;
2492+
Ok(HttpResponse::Ok().into())
2493+
}
2494+
24882495
#[post("/coordination/activate")]
24892496
async fn coordination_activate_handler(
24902497
state: WebData<ServerState>,

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,9 @@ pub trait Node: Any {
10981098
/// from a checkpoint must be backfilled from clean state.
10991099
fn clear_state(&mut self) -> Result<(), DbspError>;
11001100

1101+
/// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates.
1102+
fn start_compaction(&mut self);
1103+
11011104
/// Place operator in the replay mode.
11021105
///
11031106
/// In the replay mode the operator streams its stored state to a temporary
@@ -1836,6 +1839,8 @@ pub trait CircuitBase: 'static {
18361839
) -> Result<PartitioningPolicy, DbspError>;
18371840

18381841
fn rebalance(&self);
1842+
1843+
fn start_compaction(&self);
18391844
}
18401845

18411846
/// The circuit interface. All DBSP computation takes place within a circuit.
@@ -3506,6 +3511,13 @@ where
35063511
fn rebalance(&self) {
35073512
self.inner().balancer.rebalance()
35083513
}
3514+
3515+
fn start_compaction(&self) {
3516+
let _ = self.map_local_nodes_mut(&mut |node| {
3517+
node.start_compaction();
3518+
Ok(())
3519+
});
3520+
}
35093521
}
35103522

35113523
impl<P, T> Circuit for ChildCircuit<P, T>
@@ -4673,6 +4685,10 @@ where
46734685
self.operator.restore(base, self.persistent_id().as_deref())
46744686
}
46754687

4688+
fn start_compaction(&mut self) {
4689+
self.operator.start_compaction()
4690+
}
4691+
46764692
fn clear_state(&mut self) -> Result<(), DbspError> {
46774693
self.operator.clear_state()
46784694
}
@@ -4820,6 +4836,10 @@ where
48204836
self.operator.restore(base, self.persistent_id().as_deref())
48214837
}
48224838

4839+
fn start_compaction(&mut self) {
4840+
self.operator.start_compaction()
4841+
}
4842+
48234843
fn clear_state(&mut self) -> Result<(), DbspError> {
48244844
self.operator.clear_state()
48254845
}
@@ -4981,6 +5001,10 @@ where
49815001
self.operator.restore(base, self.persistent_id().as_deref())
49825002
}
49835003

5004+
fn start_compaction(&mut self) {
5005+
self.operator.start_compaction()
5006+
}
5007+
49845008
fn clear_state(&mut self) -> Result<(), DbspError> {
49855009
self.operator.clear_state()
49865010
}
@@ -5135,6 +5159,10 @@ where
51355159
self.operator.restore(base, self.persistent_id().as_deref())
51365160
}
51375161

5162+
fn start_compaction(&mut self) {
5163+
self.operator.start_compaction()
5164+
}
5165+
51385166
fn clear_state(&mut self) -> Result<(), DbspError> {
51395167
self.operator.clear_state()
51405168
}
@@ -5346,6 +5374,10 @@ where
53465374
self.operator.restore(base, self.persistent_id().as_deref())
53475375
}
53485376

5377+
fn start_compaction(&mut self) {
5378+
self.operator.start_compaction()
5379+
}
5380+
53495381
fn clear_state(&mut self) -> Result<(), DbspError> {
53505382
self.operator.clear_state()
53515383
}
@@ -5533,6 +5565,10 @@ where
55335565
self.operator.restore(base, self.persistent_id().as_deref())
55345566
}
55355567

5568+
fn start_compaction(&mut self) {
5569+
self.operator.start_compaction()
5570+
}
5571+
55365572
fn clear_state(&mut self) -> Result<(), DbspError> {
55375573
self.operator.clear_state()
55385574
}
@@ -5744,6 +5780,10 @@ where
57445780
self.operator.restore(base, self.persistent_id().as_deref())
57455781
}
57465782

5783+
fn start_compaction(&mut self) {
5784+
self.operator.start_compaction()
5785+
}
5786+
57475787
fn clear_state(&mut self) -> Result<(), DbspError> {
57485788
self.operator.clear_state()
57495789
}
@@ -5929,6 +5969,10 @@ where
59295969
self.operator.restore(base, self.persistent_id().as_deref())
59305970
}
59315971

5972+
fn start_compaction(&mut self) {
5973+
self.operator.start_compaction()
5974+
}
5975+
59325976
fn clear_state(&mut self) -> Result<(), DbspError> {
59335977
self.operator.clear_state()
59345978
}
@@ -6135,6 +6179,10 @@ where
61356179
self.operator.restore(base, self.persistent_id().as_deref())
61366180
}
61376181

6182+
fn start_compaction(&mut self) {
6183+
self.operator.start_compaction()
6184+
}
6185+
61386186
fn clear_state(&mut self) -> Result<(), DbspError> {
61396187
self.operator.clear_state()
61406188
}
@@ -6326,6 +6374,10 @@ where
63266374
self.operator.restore(base, self.persistent_id().as_deref())
63276375
}
63286376

6377+
fn start_compaction(&mut self) {
6378+
self.operator.start_compaction()
6379+
}
6380+
63296381
fn clear_state(&mut self) -> Result<(), DbspError> {
63306382
self.operator.clear_state()
63316383
}
@@ -6507,6 +6559,10 @@ where
65076559
.restore(base, self.persistent_id().as_deref())
65086560
}
65096561

6562+
fn start_compaction(&mut self) {
6563+
self.operator.borrow_mut().start_compaction()
6564+
}
6565+
65106566
fn clear_state(&mut self) -> Result<(), DbspError> {
65116567
self.operator.borrow_mut().clear_state()
65126568
}
@@ -6672,6 +6728,8 @@ where
66726728
Ok(())
66736729
}
66746730

6731+
fn start_compaction(&mut self) {}
6732+
66756733
fn clear_state(&mut self) -> Result<(), DbspError> {
66766734
Ok(())
66776735
}
@@ -6900,6 +6958,10 @@ where
69006958
Ok(())
69016959
}
69026960

6961+
fn start_compaction(&mut self) {
6962+
self.circuit.start_compaction();
6963+
}
6964+
69036965
fn clear_state(&mut self) -> Result<(), DbspError> {
69046966
self.circuit
69056967
.map_local_nodes_mut(&mut |node| node.clear_state())
@@ -7669,6 +7731,10 @@ impl CircuitHandle {
76697731
pub fn rebalance(&self) {
76707732
self.circuit.rebalance()
76717733
}
7734+
7735+
pub fn start_compaction(&self) {
7736+
self.circuit.start_compaction()
7737+
}
76727738
}
76737739

76747740
pin_project! {

crates/dbsp/src/circuit/dbsp_handle.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,12 @@ impl Runtime {
855855
return;
856856
}
857857
}
858+
Ok(Command::StartCompaction) => {
859+
circuit.start_compaction();
860+
if status_sender.send(Ok(Response::Unit)).is_err() {
861+
return;
862+
}
863+
}
858864
// Nothing to do: do some housekeeping and relinquish the CPU if there's none
859865
// left.
860866
Err(TryRecvError::Empty) => {
@@ -947,6 +953,7 @@ enum Command {
947953
GetCurrentBalancerPolicy(String),
948954
Rebalance,
949955
SetAutoRebalance(bool),
956+
StartCompaction,
950957
}
951958

952959
impl Debug for Command {
@@ -988,6 +995,7 @@ impl Debug for Command {
988995
Command::SetAutoRebalance(enable) => {
989996
f.debug_tuple("SetAutoRebalance").field(enable).finish()
990997
}
998+
Command::StartCompaction => write!(f, "StartCompaction"),
991999
}
9921000
}
9931001
}
@@ -1719,6 +1727,11 @@ impl DBSPHandle {
17191727
self.broadcast_command(Command::Rebalance, |_, _| {})?;
17201728
Ok(())
17211729
}
1730+
1731+
pub fn start_compaction(&mut self) -> Result<(), DbspError> {
1732+
self.broadcast_command(Command::StartCompaction, |_, _| {})?;
1733+
Ok(())
1734+
}
17221735
}
17231736

17241737
impl Drop for DBSPHandle {

crates/dbsp/src/circuit/metadata.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ pub const MERGING_MEMORY_RECORDS_COUNT: MetricId =
127127
pub const MERGING_STORAGE_RECORDS_COUNT: MetricId =
128128
MetricId(Cow::Borrowed("merging_storage_records_count"));
129129
pub const COMPLETED_MERGES: MetricId = MetricId(Cow::Borrowed("completed_merges"));
130+
pub const COMPACTION_STATE: MetricId = MetricId(Cow::Borrowed("compaction_state"));
130131
pub const NEGATIVE_WEIGHT_COUNT: MetricId = MetricId(Cow::Borrowed("negative_weight_count"));
131132
pub const BLOOM_FILTER_BITS_PER_KEY: MetricId =
132133
MetricId(Cow::Borrowed("bloom_filter_bits_per_key"));
@@ -175,7 +176,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
175176
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
176177
MetricId(Cow::Borrowed("input_integral_records_count"));
177178

178-
pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [
179+
pub const CIRCUIT_METRICS: [CircuitMetric; 75] = [
179180
// State
180181
CircuitMetric {
181182
name: USED_MEMORY_BYTES,
@@ -369,6 +370,12 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [
369370
advanced: true,
370371
description: "Information about the batches that were compacted (merged).",
371372
},
373+
CircuitMetric {
374+
name: COMPACTION_STATE,
375+
category: CircuitMetricCategory::State,
376+
advanced: true,
377+
description: "State of the compaction process.",
378+
},
372379
// Inputs
373380
CircuitMetric {
374381
name: INPUT_RECORDS_COUNT,

crates/dbsp/src/circuit/operator_traits.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,11 @@ pub trait Operator: 'static {
305305
fn flush_progress(&self) -> Option<Position> {
306306
None
307307
}
308+
309+
/// Start compaction of the operator's state.
310+
///
311+
/// Only defined for operators that support compaction. No-op for all other operators.
312+
fn start_compaction(&mut self) {}
308313
}
309314

310315
/// A source operator that injects data from the outside world or from the

crates/dbsp/src/operator/dynamic/accumulate_trace.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,12 @@ where
11011101
fn is_flush_complete(&self) -> bool {
11021102
!self.flush_output
11031103
}
1104+
1105+
fn start_compaction(&mut self) {
1106+
if let Some(trace) = self.trace.as_mut() {
1107+
trace.initiate_compaction()
1108+
}
1109+
}
11041110
}
11051111

11061112
impl<C, B, T> StrictOperator<T> for AccumulateZ1Trace<C, B, T>

crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,8 @@ where
862862
Ok(())
863863
}
864864

865+
fn start_compaction(&mut self) {}
866+
865867
fn eval<'a>(
866868
&'a mut self,
867869
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {

0 commit comments

Comments
 (0)