Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ pub type SyncCheckpointCallbackFn = Box<dyn FnOnce(Result<(), Arc<ControllerErro
/// Rebalance callback argument to [`Controller::rebalance`].
pub type RebalanceCallbackFn = Box<dyn FnOnce(Result<(), ControllerError>) + Send>;

pub type StartCompactionCallbackFn = Box<dyn FnOnce(Result<(), ControllerError>) + Send>;

/// A command that [Controller] can send to [Controller::circuit_thread].
///
/// There is no type for a command reply. Instead, the command implementation
Expand All @@ -481,6 +483,7 @@ enum Command {
Suspend(SuspendCallbackFn),
SyncCheckpoint((uuid::Uuid, SyncCheckpointCallbackFn)),
Rebalance(RebalanceCallbackFn),
StartCompaction(StartCompactionCallbackFn),
}

impl Command {
Expand All @@ -496,6 +499,7 @@ impl Command {
callback(Err(Arc::new(ControllerError::ControllerExit)))
}
Command::Rebalance(callback) => callback(Err(ControllerError::ControllerExit)),
Command::StartCompaction(callback) => callback(Err(ControllerError::ControllerExit)),
}
}
}
Expand Down Expand Up @@ -1989,6 +1993,19 @@ impl Controller {
Ok(())
}

pub async fn start_compaction(&self) -> Result<(), ControllerError> {
let (sender, receiver) = oneshot::channel();
self.inner
.send_command(Command::StartCompaction(Box::new(move |result| {
if sender.send(result).is_err() {
error!("`/start_compaction` result could not be sent");
}
})));
self.inner.circuit_thread_unparker.unpark();
receiver.await.unwrap()?;
Ok(())
}

/// Returns an object for monitoring the step that the controller has
/// completed.
pub fn step_watcher(&self) -> tokio::sync::watch::Receiver<StepStatus> {
Expand Down Expand Up @@ -3332,6 +3349,11 @@ impl CircuitThread {
.rebalance()
.map_err(ControllerError::dbsp_error),
),
Command::StartCompaction(reply_callback) => reply_callback(
self.circuit
.start_compaction()
.map_err(ControllerError::dbsp_error),
),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/adapters/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@ where
.service(reset_output_endpoint)
.service(reset_status)
.service(rebalance)
.service(start_compaction)
.service(coordination_activate_handler)
.service(coordination_status)
.service(coordination_step_request)
Expand Down Expand Up @@ -2444,6 +2445,12 @@ async fn rebalance(state: WebData<ServerState>) -> Result<HttpResponse, Pipeline
Ok(HttpResponse::Ok().into())
}

#[post("/start_compaction")]
async fn start_compaction(state: WebData<ServerState>) -> Result<HttpResponse, PipelineError> {
state.controller()?.start_compaction().await?;
Ok(HttpResponse::Ok().into())
}

#[post("/coordination/activate")]
async fn coordination_activate_handler(
state: WebData<ServerState>,
Expand Down
66 changes: 66 additions & 0 deletions crates/dbsp/src/circuit/circuit_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,9 @@ pub trait Node: Any {
/// from a checkpoint must be backfilled from clean state.
fn clear_state(&mut self) -> Result<(), DbspError>;

/// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates.
fn start_compaction(&mut self);

/// Place operator in the replay mode.
///
/// In the replay mode the operator streams its stored state to a temporary
Expand Down Expand Up @@ -1836,6 +1839,8 @@ pub trait CircuitBase: 'static {
) -> Result<PartitioningPolicy, DbspError>;

fn rebalance(&self);

fn start_compaction(&self);
}

/// The circuit interface. All DBSP computation takes place within a circuit.
Expand Down Expand Up @@ -3506,6 +3511,13 @@ where
fn rebalance(&self) {
self.inner().balancer.rebalance()
}

fn start_compaction(&self) {
let _ = self.map_local_nodes_mut(&mut |node| {
node.start_compaction();
Ok(())
});
}
}

impl<P, T> Circuit for ChildCircuit<P, T>
Expand Down Expand Up @@ -4673,6 +4685,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -4820,6 +4836,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -4981,6 +5001,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5135,6 +5159,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5346,6 +5374,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5533,6 +5565,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5744,6 +5780,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -5929,6 +5969,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6135,6 +6179,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6326,6 +6374,10 @@ where
self.operator.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
Expand Down Expand Up @@ -6507,6 +6559,10 @@ where
.restore(base, self.persistent_id().as_deref())
}

fn start_compaction(&mut self) {
self.operator.borrow_mut().start_compaction()
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().clear_state()
}
Expand Down Expand Up @@ -6672,6 +6728,8 @@ where
Ok(())
}

fn start_compaction(&mut self) {}

fn clear_state(&mut self) -> Result<(), DbspError> {
Ok(())
}
Expand Down Expand Up @@ -6900,6 +6958,10 @@ where
Ok(())
}

fn start_compaction(&mut self) {
self.circuit.start_compaction();
}

fn clear_state(&mut self) -> Result<(), DbspError> {
self.circuit
.map_local_nodes_mut(&mut |node| node.clear_state())
Expand Down Expand Up @@ -7669,6 +7731,10 @@ impl CircuitHandle {
pub fn rebalance(&self) {
self.circuit.rebalance()
}

pub fn start_compaction(&self) {
self.circuit.start_compaction()
}
}

pin_project! {
Expand Down
13 changes: 13 additions & 0 deletions crates/dbsp/src/circuit/dbsp_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,12 @@ impl Runtime {
return;
}
}
Ok(Command::StartCompaction) => {
circuit.start_compaction();
if status_sender.send(Ok(Response::Unit)).is_err() {
return;
}
}
// Nothing to do: do some housekeeping and relinquish the CPU if there's none
// left.
Err(TryRecvError::Empty) => {
Expand Down Expand Up @@ -953,6 +959,7 @@ enum Command {
GetCurrentBalancerPolicy(String),
Rebalance,
SetAutoRebalance(bool),
StartCompaction,
}

impl Debug for Command {
Expand Down Expand Up @@ -994,6 +1001,7 @@ impl Debug for Command {
Command::SetAutoRebalance(enable) => {
f.debug_tuple("SetAutoRebalance").field(enable).finish()
}
Command::StartCompaction => write!(f, "StartCompaction"),
}
}
}
Expand Down Expand Up @@ -1737,6 +1745,11 @@ impl DBSPHandle {
self.broadcast_command(Command::Rebalance, |_, _| {})?;
Ok(())
}

pub fn start_compaction(&mut self) -> Result<(), DbspError> {
self.broadcast_command(Command::StartCompaction, |_, _| {})?;
Ok(())
}
}

impl Drop for DBSPHandle {
Expand Down
9 changes: 8 additions & 1 deletion crates/dbsp/src/circuit/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub const MERGING_MEMORY_RECORDS_COUNT: MetricId =
pub const MERGING_STORAGE_RECORDS_COUNT: MetricId =
MetricId(Cow::Borrowed("merging_storage_records_count"));
pub const COMPLETED_MERGES: MetricId = MetricId(Cow::Borrowed("completed_merges"));
pub const COMPACTION_STATE: MetricId = MetricId(Cow::Borrowed("compaction_state"));
pub const NEGATIVE_WEIGHT_COUNT: MetricId = MetricId(Cow::Borrowed("negative_weight_count"));
pub const BLOOM_FILTER_BITS_PER_KEY: MetricId =
MetricId(Cow::Borrowed("bloom_filter_bits_per_key"));
Expand Down Expand Up @@ -175,7 +176,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
MetricId(Cow::Borrowed("input_integral_records_count"));

pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [
pub const CIRCUIT_METRICS: [CircuitMetric; 75] = [
// State
CircuitMetric {
name: USED_MEMORY_BYTES,
Expand Down Expand Up @@ -369,6 +370,12 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [
advanced: true,
description: "Information about the batches that were compacted (merged).",
},
CircuitMetric {
name: COMPACTION_STATE,
category: CircuitMetricCategory::State,
advanced: true,
description: "State of the compaction process.",
},
// Inputs
CircuitMetric {
name: INPUT_RECORDS_COUNT,
Expand Down
5 changes: 5 additions & 0 deletions crates/dbsp/src/circuit/operator_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ pub trait Operator: 'static {
fn flush_progress(&self) -> Option<Position> {
None
}

/// Start compaction of the operator's state.
///
/// Only defined for operators that support compaction. No-op for all other operators.
fn start_compaction(&mut self) {}
}

/// A source operator that injects data from the outside world or from the
Expand Down
6 changes: 6 additions & 0 deletions crates/dbsp/src/operator/dynamic/accumulate_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,12 @@ where
fn is_flush_complete(&self) -> bool {
!self.flush_output
}

fn start_compaction(&mut self) {
if let Some(trace) = self.trace.as_mut() {
trace.initiate_compaction()
}
}
}

impl<C, B, T> StrictOperator<T> for AccumulateZ1Trace<C, B, T>
Expand Down
2 changes: 2 additions & 0 deletions crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,8 @@ where
Ok(())
}

fn start_compaction(&mut self) {}

fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Expand Down
Loading
Loading