diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 49ce170204b..0ca7059c2f4 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -470,6 +470,8 @@ pub type SyncCheckpointCallbackFn = Box) + Send>; +pub type StartCompactionCallbackFn = Box) + Send>; + /// A command that [Controller] can send to [Controller::circuit_thread]. /// /// There is no type for a command reply. Instead, the command implementation @@ -481,6 +483,7 @@ enum Command { Suspend(SuspendCallbackFn), SyncCheckpoint((uuid::Uuid, SyncCheckpointCallbackFn)), Rebalance(RebalanceCallbackFn), + StartCompaction(StartCompactionCallbackFn), } impl Command { @@ -496,6 +499,7 @@ impl Command { callback(Err(Arc::new(ControllerError::ControllerExit))) } Command::Rebalance(callback) => callback(Err(ControllerError::ControllerExit)), + Command::StartCompaction(callback) => callback(Err(ControllerError::ControllerExit)), } } } @@ -1989,6 +1993,19 @@ impl Controller { Ok(()) } + pub async fn start_compaction(&self) -> Result<(), ControllerError> { + let (sender, receiver) = oneshot::channel(); + self.inner + .send_command(Command::StartCompaction(Box::new(move |result| { + if sender.send(result).is_err() { + error!("`/start_compaction` result could not be sent"); + } + }))); + self.inner.circuit_thread_unparker.unpark(); + receiver.await.unwrap()?; + Ok(()) + } + /// Returns an object for monitoring the step that the controller has /// completed. pub fn step_watcher(&self) -> tokio::sync::watch::Receiver { @@ -3332,6 +3349,11 @@ impl CircuitThread { .rebalance() .map_err(ControllerError::dbsp_error), ), + Command::StartCompaction(reply_callback) => reply_callback( + self.circuit + .start_compaction() + .map_err(ControllerError::dbsp_error), + ), } } } diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 22027506c61..fb5ffc05dd9 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -1238,6 +1238,7 @@ where .service(reset_output_endpoint) .service(reset_status) .service(rebalance) + .service(start_compaction) .service(coordination_activate_handler) .service(coordination_status) .service(coordination_step_request) @@ -2444,6 +2445,12 @@ async fn rebalance(state: WebData) -> Result) -> Result { + state.controller()?.start_compaction().await?; + Ok(HttpResponse::Ok().into()) +} + #[post("/coordination/activate")] async fn coordination_activate_handler( state: WebData, diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index bb8895e23f1..619bb2d0a94 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -1098,6 +1098,9 @@ pub trait Node: Any { /// from a checkpoint must be backfilled from clean state. fn clear_state(&mut self) -> Result<(), DbspError>; + /// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates. + fn start_compaction(&mut self); + /// Place operator in the replay mode. /// /// In the replay mode the operator streams its stored state to a temporary @@ -1836,6 +1839,8 @@ pub trait CircuitBase: 'static { ) -> Result; fn rebalance(&self); + + fn start_compaction(&self); } /// The circuit interface. All DBSP computation takes place within a circuit. @@ -3506,6 +3511,13 @@ where fn rebalance(&self) { self.inner().balancer.rebalance() } + + fn start_compaction(&self) { + let _ = self.map_local_nodes_mut(&mut |node| { + node.start_compaction(); + Ok(()) + }); + } } impl Circuit for ChildCircuit @@ -4673,6 +4685,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -4820,6 +4836,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -4981,6 +5001,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5135,6 +5159,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5346,6 +5374,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5533,6 +5565,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5744,6 +5780,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5929,6 +5969,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6135,6 +6179,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6326,6 +6374,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6507,6 +6559,10 @@ where .restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.borrow_mut().start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.borrow_mut().clear_state() } @@ -6672,6 +6728,8 @@ where Ok(()) } + fn start_compaction(&mut self) {} + fn clear_state(&mut self) -> Result<(), DbspError> { Ok(()) } @@ -6900,6 +6958,10 @@ where Ok(()) } + fn start_compaction(&mut self) { + self.circuit.start_compaction(); + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.circuit .map_local_nodes_mut(&mut |node| node.clear_state()) @@ -7669,6 +7731,10 @@ impl CircuitHandle { pub fn rebalance(&self) { self.circuit.rebalance() } + + pub fn start_compaction(&self) { + self.circuit.start_compaction() + } } pin_project! { diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 048ffaea610..82daa97cea7 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -860,6 +860,12 @@ impl Runtime { return; } } + Ok(Command::StartCompaction) => { + circuit.start_compaction(); + if status_sender.send(Ok(Response::Unit)).is_err() { + return; + } + } // Nothing to do: do some housekeeping and relinquish the CPU if there's none // left. Err(TryRecvError::Empty) => { @@ -953,6 +959,7 @@ enum Command { GetCurrentBalancerPolicy(String), Rebalance, SetAutoRebalance(bool), + StartCompaction, } impl Debug for Command { @@ -994,6 +1001,7 @@ impl Debug for Command { Command::SetAutoRebalance(enable) => { f.debug_tuple("SetAutoRebalance").field(enable).finish() } + Command::StartCompaction => write!(f, "StartCompaction"), } } } @@ -1737,6 +1745,11 @@ impl DBSPHandle { self.broadcast_command(Command::Rebalance, |_, _| {})?; Ok(()) } + + pub fn start_compaction(&mut self) -> Result<(), DbspError> { + self.broadcast_command(Command::StartCompaction, |_, _| {})?; + Ok(()) + } } impl Drop for DBSPHandle { diff --git a/crates/dbsp/src/circuit/metadata.rs b/crates/dbsp/src/circuit/metadata.rs index d0c3001b97b..da46f24dd38 100644 --- a/crates/dbsp/src/circuit/metadata.rs +++ b/crates/dbsp/src/circuit/metadata.rs @@ -127,6 +127,7 @@ pub const MERGING_MEMORY_RECORDS_COUNT: MetricId = pub const MERGING_STORAGE_RECORDS_COUNT: MetricId = MetricId(Cow::Borrowed("merging_storage_records_count")); pub const COMPLETED_MERGES: MetricId = MetricId(Cow::Borrowed("completed_merges")); +pub const COMPACTION_STATE: MetricId = MetricId(Cow::Borrowed("compaction_state")); pub const NEGATIVE_WEIGHT_COUNT: MetricId = MetricId(Cow::Borrowed("negative_weight_count")); pub const BLOOM_FILTER_BITS_PER_KEY: MetricId = MetricId(Cow::Borrowed("bloom_filter_bits_per_key")); @@ -175,7 +176,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId = MetricId(Cow::Borrowed("input_integral_records_count")); -pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [ +pub const CIRCUIT_METRICS: [CircuitMetric; 75] = [ // State CircuitMetric { name: USED_MEMORY_BYTES, @@ -369,6 +370,12 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [ advanced: true, description: "Information about the batches that were compacted (merged).", }, + CircuitMetric { + name: COMPACTION_STATE, + category: CircuitMetricCategory::State, + advanced: true, + description: "State of the compaction process.", + }, // Inputs CircuitMetric { name: INPUT_RECORDS_COUNT, diff --git a/crates/dbsp/src/circuit/operator_traits.rs b/crates/dbsp/src/circuit/operator_traits.rs index 8036fb423ea..da57e96ef9e 100644 --- a/crates/dbsp/src/circuit/operator_traits.rs +++ b/crates/dbsp/src/circuit/operator_traits.rs @@ -305,6 +305,11 @@ pub trait Operator: 'static { fn flush_progress(&self) -> Option { None } + + /// Start compaction of the operator's state. + /// + /// Only defined for operators that support compaction. No-op for all other operators. + fn start_compaction(&mut self) {} } /// A source operator that injects data from the outside world or from the diff --git a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs index a89afef8956..829be69c111 100644 --- a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs +++ b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs @@ -1101,6 +1101,12 @@ where fn is_flush_complete(&self) -> bool { !self.flush_output } + + fn start_compaction(&mut self) { + if let Some(trace) = self.trace.as_mut() { + trace.initiate_compaction() + } + } } impl StrictOperator for AccumulateZ1Trace diff --git a/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs b/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs index 7f51d843f1c..ce2414876de 100644 --- a/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs +++ b/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs @@ -862,6 +862,8 @@ where Ok(()) } + fn start_compaction(&mut self) {} + fn eval<'a>( &'a mut self, ) -> Pin, SchedulerError>> + 'a>> { diff --git a/crates/dbsp/src/operator/dynamic/trace.rs b/crates/dbsp/src/operator/dynamic/trace.rs index 43a83d4eb32..2d90686a3f2 100644 --- a/crates/dbsp/src/operator/dynamic/trace.rs +++ b/crates/dbsp/src/operator/dynamic/trace.rs @@ -1168,6 +1168,12 @@ where fn is_flush_complete(&self) -> bool { !self.flush_output } + + fn start_compaction(&mut self) { + if let Some(trace) = self.trace.as_mut() { + trace.initiate_compaction() + } + } } impl StrictOperator for Z1Trace @@ -1303,12 +1309,18 @@ mod test { use std::{ cmp::max, collections::{BTreeMap, BTreeSet}, + thread, + time::{Duration, Instant}, }; use crate::{ OrdIndexedZSet, Runtime, Stream, TypedBox, ZWeight, + circuit::{ + CircuitConfig, GlobalNodeId, + metadata::{MetaItem, SPINE_BATCHES_COUNT}, + }, dynamic::DynData, - typed_batch::{self, Spine}, + typed_batch::{self, IndexedZSetReader, Spine}, utils::Tup2, }; use proptest::{collection::vec, prelude::*}; @@ -1643,6 +1655,196 @@ mod test { test_integrate_trace_retain(batches, 10, 10); } + fn spine_batches_count_values(profile: &crate::profile::DbspProfile) -> Vec<(String, usize)> { + let root = GlobalNodeId::root(); + let operator_names = spine_operator_names(profile); + + // The root profile entry merges child metrics, so only inspect individual operators. + let mut counts = profile + .worker_profiles + .iter() + .flat_map(|profile| profile.attribute_profile(&SPINE_BATCHES_COUNT)) + .filter_map(|(node_id, value)| { + (node_id != root).then(|| match value { + MetaItem::Count(count) => { + let operator_name = operator_names + .get(&node_id.node_identifier()) + .cloned() + .unwrap_or_else(|| node_id.to_string()); + (operator_name, count) + } + value => panic!("spine_batches_count must be serialized as a count: {value:?}"), + }) + }) + .collect::>(); + + counts.sort(); + counts + } + + fn spine_operator_names(profile: &crate::profile::DbspProfile) -> BTreeMap { + let mut operator_names = BTreeMap::new(); + + if let Some(graph) = &profile.graph { + let graph_json: serde_json::Value = serde_json::from_str(&graph.to_json()).unwrap(); + collect_operator_names(&graph_json, &mut operator_names); + } + + operator_names + } + + fn collect_operator_names( + value: &serde_json::Value, + operator_names: &mut BTreeMap, + ) { + match value { + serde_json::Value::Object(object) => { + if let (Some(id), Some(label)) = ( + object.get("id").and_then(serde_json::Value::as_str), + object.get("label").and_then(serde_json::Value::as_str), + ) { + let name = label + .split("\\l") + .next() + .unwrap_or(label) + .split(" @ ") + .next() + .unwrap_or(label); + operator_names.insert(id.to_owned(), name.to_owned()); + } + + for value in object.values() { + collect_operator_names(value, operator_names); + } + } + serde_json::Value::Array(array) => { + for value in array { + collect_operator_names(value, operator_names); + } + } + _ => {} + } + } + + #[test] + fn test_start_compaction_accumulate_integrate_trace() { + const NUM_TRACES: usize = 3; + const ITERATIONS: usize = 3; + const BATCHES_PER_ITERATION: usize = 20; + const TOTAL_BATCHES: usize = BATCHES_PER_ITERATION * ITERATIONS * (ITERATIONS + 1) / 2; + const RECORDS_PER_BATCH: usize = 2000; + + let storage_dir = tempfile::tempdir().unwrap(); + let circuit_config = + CircuitConfig::with_workers(2).with_temporary_storage(storage_dir.path()); + let (mut dbsp, (input_handles, trace_outputs)) = + Runtime::init_circuit(circuit_config, move |circuit| { + // Build several independent trace integrals so one compaction request has to visit + // multiple `accumulate_integrate_trace` operators in a storage-enabled circuit. + let mut input_handles = Vec::new(); + let mut trace_outputs = Vec::new(); + + for _ in 0..NUM_TRACES { + let (stream, handle) = circuit.add_input_indexed_zset::(); + let trace = stream.shard().accumulate_integrate_trace(); + let output = trace + .apply(|trace| trace.ro_snapshot().consolidate()) + .output(); + input_handles.push(handle); + trace_outputs.push(output); + } + + Ok((input_handles, trace_outputs)) + }) + .unwrap(); + + let mut next_batch = 0; + for iteration in 0..ITERATIONS { + // Each iteration appends more batches than the previous one. This exercises + // repeated compaction on traces that keep receiving additional input. + let batches_this_iteration = (iteration + 1) * BATCHES_PER_ITERATION; + let end_batch = next_batch + batches_this_iteration; + + for batch in next_batch..end_batch { + for (trace, input_handle) in input_handles.iter().enumerate() { + let mut tuples = Vec::with_capacity(RECORDS_PER_BATCH); + for record in 0..RECORDS_PER_BATCH { + // Use disjoint key ranges per trace so the output validation can + // reconstruct the expected integral exactly. + let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH + + batch * RECORDS_PER_BATCH + + record) as i32; + tuples.push(Tup2(key, Tup2(record as i32, 1))); + } + + input_handle.append(&mut tuples); + } + + dbsp.transaction().unwrap(); + } + + next_batch = end_batch; + + let profile = dbsp.retrieve_profile().unwrap(); + let counts = spine_batches_count_values(&profile); + println!( + "SPINE_BATCHES_COUNT values before compaction iteration {iteration}: {counts:?}" + ); + assert!( + !counts.is_empty(), + "profile should contain at least one SPINE_BATCHES_COUNT metric" + ); + + dbsp.start_compaction().unwrap(); + thread::sleep(Duration::from_millis(100)); + // Starting compaction again while the previous request is still making progress + // should be idempotent. + dbsp.start_compaction().unwrap(); + + let deadline = Instant::now() + Duration::from_secs(60); + loop { + let profile = dbsp.retrieve_profile().unwrap(); + let counts = spine_batches_count_values(&profile); + println!( + "SPINE_BATCHES_COUNT values after compaction request iteration {iteration}: {counts:?}" + ); + assert!( + !counts.is_empty(), + "profile should contain at least one SPINE_BATCHES_COUNT metric" + ); + + if counts.iter().all(|(_, count)| *count <= 1) { + break; + } + + assert!( + Instant::now() < deadline, + "timed out waiting for all SPINE_BATCHES_COUNT values to be <= 1: {counts:?}" + ); + thread::sleep(Duration::from_secs(5)); + } + + // After all spines report at most one batch, run one more logical step to + // materialize the trace outputs and verify compaction preserved every record. + dbsp.transaction().unwrap(); + for (trace, output) in trace_outputs.iter().enumerate() { + let output = output.consolidate(); + let mut actual_tuples = output.iter(); + + for batch in 0..next_batch { + for record in 0..RECORDS_PER_BATCH { + let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH + + batch * RECORDS_PER_BATCH + + record) as i32; + assert_eq!(actual_tuples.next(), Some((key, record as i32, 1))); + } + } + + assert_eq!(actual_tuples.next(), None); + } + } + } + proptest! { #![proptest_config(ProptestConfig::with_cases(16))] diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 607b6ced249..a397c473325 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -313,6 +313,8 @@ pub trait Trace: BatchReader { /// Allows the trace to report additional metadata. fn metadata(&self, _meta: &mut OperatorMeta) {} + + fn initiate_compaction(&self); } /// Where a batch is stored. diff --git a/crates/dbsp/src/trace/spine_async.rs b/crates/dbsp/src/trace/spine_async.rs index 7c46fc5afa7..4bf73ed7f94 100644 --- a/crates/dbsp/src/trace/spine_async.rs +++ b/crates/dbsp/src/trace/spine_async.rs @@ -12,7 +12,7 @@ use crate::{ max_level0_batch_size_records, metadata::{ BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_HIT_RATE_PERCENT, BLOOM_FILTER_HITS_COUNT, - BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPLETED_MERGES, + BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPACTION_STATE, COMPLETED_MERGES, LOOSE_BATCHES_COUNT, LOOSE_MEMORY_RECORDS_COUNT, LOOSE_STORAGE_RECORDS_COUNT, MERGE_BACKPRESSURE_WAIT_TIME_SECONDS, MERGE_REDUCTION_PERCENT, MERGING_BATCHES_COUNT, MERGING_MEMORY_RECORDS_COUNT, MERGING_SIZE_BYTES, MERGING_STORAGE_RECORDS_COUNT, @@ -134,6 +134,53 @@ impl From<(Vec, &Spine)> for CommittedSpine { } } +/// State of the compaction process in a slot. +/// +/// ```text +/// │ +/// ▼ +/// ┌──────┐ ┌─────────┐ +/// │ none ├───────────────────────────►│requested│ +/// └──────┘ └────┬────┘ +/// ▲ │ +/// │ │ +/// │ │ +/// │ │ +/// │ ┌───────────┐ │ +/// └─────────┤in progress│◄──────────────┘ +/// └───────────┘ +/// ``` +/// +/// * none -> requested: +/// - triggered by the `initiate_compaction` method in the first non-empty slot of the spine. +/// - triggered when compaction completes in the previous slot. +/// * requested -> in progress: +/// - triggered when the slot finishes processing any ongoing merge and discovers that +/// compaction status has been set to requested. It starts a new merge including _all_ +/// batches at the current level. +/// * in progress -> none: +/// - the merge completes; the resulting batch is pushed to the next slot regardless of +/// its size; compaction is initiated at the next slot. +/// * requested -> none: +/// - there is <=1 batches in the current slot. Compaction completes instantly and the +/// batch is pushed to the next slot; compaction is initiated at the next slot. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CompactionStatus { + None, + Requested, + InProgress, +} + +impl Display for CompactionStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompactionStatus::None => write!(f, "none"), + CompactionStatus::Requested => write!(f, "requested"), + CompactionStatus::InProgress => write!(f, "in progress"), + } + } +} + /// A group of batches with similar sizes (as determined by [size_from_level]). #[derive(Clone, SizeOf)] struct Slot @@ -166,6 +213,10 @@ where /// Wake up the task that handles merges at this level. #[size_of(skip)] notify: Arc, + + /// State of the compaction process in this slot. + #[size_of(skip)] + compaction_status: CompactionStatus, } impl Default for Slot @@ -181,6 +232,7 @@ where n_merged_batches: 0, n_steps: 0, notify: Arc::new(Notify::new()), + compaction_status: CompactionStatus::None, } } } @@ -216,12 +268,23 @@ where let merge_counts = &MERGE_COUNTS[level]; // Start a merge if there is no ongoing merge and there are either enough loose batches to start a merge, - // or we are under high memory pressure and there's at least one in-memory batch in this slot. + // or we are under high memory pressure and there's at least one in-memory batch in this slot, or + // compaction has been requested and there are more than one batches in the slot. if self.merging_batches.is_none() && (self.loose_batches.len() >= *merge_counts.start() - || self.must_relieve_memory_pressure()) + || self.must_relieve_memory_pressure() + || (self.compaction_status == CompactionStatus::Requested + && self.loose_batches.len() > 1)) { - let n = std::cmp::min(*merge_counts.end(), self.loose_batches.len()); + // Compaction requested - merge all batches in the slot. + let max_batches = if self.compaction_status == CompactionStatus::Requested { + self.compaction_status = CompactionStatus::InProgress; + usize::MAX + } else { + *merge_counts.end() + }; + + let n = std::cmp::min(max_batches, self.loose_batches.len()); let batches = self.loose_batches.drain(..n).collect::>(); self.merging_batches = Some(batches.clone()); Some(batches) @@ -408,7 +471,28 @@ where ) }) .record(); - if !new_batch.is_empty() { + + if slot.compaction_status == CompactionStatus::InProgress { + // We finished merging all batches in the slot as part of compaction. + slot.compaction_status = CompactionStatus::None; + + // If there are more non-empty slots above, push compaction results to the next slot + // and initiate compaction there; otherwise, compaction is finished for the spine. + if let Some(last_level) = self.last_non_empty_slot() + && last_level > level + { + self.initiate_compaction_at_level( + level + 1, + if new_batch.is_empty() { + vec![] + } else { + vec![new_batch] + }, + ); + } else if !new_batch.is_empty() { + self.add_batch(new_batch, new_level); + } + } else if !new_batch.is_empty() { self.add_batch(new_batch, new_level); } } @@ -421,6 +505,49 @@ where fn metadata_snapshot(&self) -> ([Slot; MAX_LEVELS], SpineStats) { (self.slots.clone(), self.spine_stats.clone()) } + + /// Non-empty slot with the smallest index. + fn first_non_empty_slot(&self) -> Option { + for (i, slot) in self.slots.iter().enumerate() { + if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() { + return Some(i); + } + } + None + } + + /// Non-empty slot with the largest index. + fn last_non_empty_slot(&self) -> Option { + for (i, slot) in self.slots.iter().enumerate().rev() { + if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() { + return Some(i); + } + } + None + } + + fn initiate_compaction(&mut self) { + let Some(level) = self.first_non_empty_slot() else { + return; + }; + + self.initiate_compaction_at_level(level, Vec::new()); + } + + /// Push batches to the slot at the given level and initiate compaction. + fn initiate_compaction_at_level(&mut self, level: usize, batches: Vec>) { + let slot = &mut self.slots[level]; + slot.loose_batches.extend(batches); + + // Note: if compaction is already in progress in this slot, this means that the + // user triggered a new compaction run before the previous one completed. In this + // case, we reset the state back to Requested, which will cause the slot to start a new + // round of merging right after the current one before pushing the results to the next + // slot. + // Effectively, we are merging the two compaction sweeps into one. + slot.compaction_status = CompactionStatus::Requested; + slot.notify.notify_one(); + } } /// A fully asynchronous merger. @@ -709,6 +836,11 @@ where ])), )]); } + meta.extend([MetricReading::new( + COMPACTION_STATE, + vec![(Cow::Borrowed("slot"), index.to_string().into())], + MetaItem::String(slot.compaction_status.to_string()), + )]); let mut negative_weight_count = 0; let mut has_negative_weight_counts = false; @@ -883,6 +1015,11 @@ where cache_stats.metadata(meta); } + + fn initiate_compaction(&self) { + let mut state = self.state.lock().unwrap(); + state.initiate_compaction(); + } } impl Drop for AsyncMerger @@ -1059,7 +1196,28 @@ where // Figuring out what merges to start requires the lock. Then we drop // the lock to actually start them, in case that's expensive (it // might require creating a file, for example). - let start_merge = self.state.lock().unwrap().slots[level].try_start_merge(level); + let start_merge = { + let mut state = self.state.lock().unwrap(); + let last_non_empty_slot = state.last_non_empty_slot(); + let slot = &mut state.slots[level]; + let start_merge = slot.try_start_merge(level); + + if slot.compaction_status == CompactionStatus::Requested + && slot.merging_batches.is_none() + { + slot.compaction_status = CompactionStatus::None; + + if let Some(last_level) = last_non_empty_slot + && last_level > level + { + let batches = slot.loose_batches.drain(..).collect::>(); + + state.initiate_compaction_at_level(level + 1, batches); + } + } + + start_merge + }; let snapshot = if value_filter .as_ref() @@ -1817,6 +1975,10 @@ where fn metadata(&self, meta: &mut OperatorMeta) { self.merger.metadata(meta); } + + fn initiate_compaction(&self) { + self.merger.initiate_compaction(); + } } impl Spine diff --git a/crates/dbsp/src/trace/test/test_batch.rs b/crates/dbsp/src/trace/test/test_batch.rs index 3513fdced47..df037f926bc 100644 --- a/crates/dbsp/src/trace/test/test_batch.rs +++ b/crates/dbsp/src/trace/test/test_batch.rs @@ -1393,6 +1393,8 @@ where ) -> Result<(), crate::Error> { todo!() } + + fn initiate_compaction(&self) {} } /// Test random sampling methods. diff --git a/crates/fda/src/cli.rs b/crates/fda/src/cli.rs index 7fabd08fa82..215973c7116 100644 --- a/crates/fda/src/cli.rs +++ b/crates/fda/src/cli.rs @@ -703,6 +703,12 @@ pub enum PipelineAction { #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] name: String, }, + /// Initiate compaction. + StartCompaction { + /// The name of the pipeline. + #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] + name: String, + }, /// Clear the storage resources of a pipeline. /// /// Note that the pipeline must be stopped before clearing its storage resources. diff --git a/crates/fda/src/main.rs b/crates/fda/src/main.rs index b46ec2b7ce6..28a909342d0 100644 --- a/crates/fda/src/main.rs +++ b/crates/fda/src/main.rs @@ -1910,6 +1910,20 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .unwrap(); println!("Initiated rebalancing for pipeline {name}."); } + PipelineAction::StartCompaction { name } => { + client + .post_pipeline_start_compaction() + .pipeline_name(name.clone()) + .send() + .await + .map_err(handle_errors_fatal( + client.baseurl().clone(), + "Failed to initiate compaction", + 1, + )) + .unwrap(); + println!("Initiated compaction for pipeline {name}."); + } PipelineAction::Bench { args } => bench::bench(client, format, args).await, PipelineAction::DismissError { name } => { let response = client diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index 10b0d62c401..b7069c185d3 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -1145,6 +1145,58 @@ pub(crate) async fn post_pipeline_rebalance( .await } +/// Initiate compaction. +/// +/// Initiate immediate compaction of the pipeline's state. +#[utoipa::path( + context_path = "/v0", + security(("JSON web token (JWT) or API key" = [])), + params( + ("pipeline_name" = String, Path, description = "Unique pipeline name"), + ), + responses( + (status = OK + , description = "Compaction started successfully"), + (status = NOT_FOUND + , description = "Pipeline with that name does not exist" + , body = ErrorResponse + , example = json!(examples::error_unknown_pipeline_name())), + (status = SERVICE_UNAVAILABLE + , body = ErrorResponse + , examples( + ("Pipeline is not deployed" = (value = json!(examples::error_pipeline_interaction_not_deployed()))), + ("Pipeline is currently unavailable" = (value = json!(examples::error_pipeline_interaction_currently_unavailable()))), + ("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))), + ("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout()))) + ) + ), + (status = INTERNAL_SERVER_ERROR, body = ErrorResponse), + ), + tag = "Pipeline Lifecycle" +)] +#[post("/pipelines/{pipeline_name}/start_compaction")] +pub(crate) async fn post_pipeline_start_compaction( + state: WebData, + client: WebData, + tenant_id: ReqData, + path: web::Path, +) -> Result { + let pipeline_name = path.into_inner(); + state + .runner + .forward_http_request_to_pipeline_by_name( + client.as_ref(), + *tenant_id, + &pipeline_name, + Method::POST, + "start_compaction", + "", + Some(Duration::from_secs(120)), + None, + ) + .await +} + /// Sync Checkpoints To S3 /// /// Syncs latest checkpoints to the object store configured in pipeline config. diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index d9346a3a520..c5db6563efd 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -234,6 +234,7 @@ It contains the following fields: endpoints::pipeline_interaction::get_pipeline_time_series, endpoints::pipeline_interaction::get_pipeline_time_series_stream, endpoints::pipeline_interaction::post_pipeline_rebalance, + endpoints::pipeline_interaction::post_pipeline_start_compaction, // API keys endpoints::api_key::list_api_keys, @@ -674,6 +675,7 @@ fn api_scope() -> Scope { .service(endpoints::pipeline_interaction::start_samply_profile) .service(endpoints::pipeline_interaction::support_bundle::get_pipeline_support_bundle) .service(endpoints::pipeline_interaction::post_pipeline_rebalance) + .service(endpoints::pipeline_interaction::post_pipeline_start_compaction) .service(endpoints::pipeline_interaction::pipeline_adhoc_sql) .service(endpoints::pipeline_interaction::completion_token) .service(endpoints::pipeline_interaction::completion_status) diff --git a/openapi.json b/openapi.json index 07689adbce3..2b8eab57d55 100644 --- a/openapi.json +++ b/openapi.json @@ -5156,6 +5156,120 @@ ] } }, + "/v0/pipelines/{pipeline_name}/start_compaction": { + "post": { + "tags": [ + "Pipeline Lifecycle" + ], + "summary": "Initiate compaction.", + "description": "Initiate immediate compaction of the pipeline's state.", + "operationId": "post_pipeline_start_compaction", + "parameters": [ + { + "name": "pipeline_name", + "in": "path", + "description": "Unique pipeline name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Compaction started successfully" + }, + "404": { + "description": "Pipeline with that name does not exist", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "example": { + "message": "Unknown pipeline name 'non-existent-pipeline'", + "error_code": "UnknownPipelineName", + "details": { + "pipeline_name": "non-existent-pipeline" + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "examples": { + "Disconnected during response": { + "value": { + "message": "Error sending HTTP request to pipeline: the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs. Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs." + } + } + }, + "Pipeline is currently unavailable": { + "value": { + "message": "Error sending HTTP request to pipeline: deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again" + } + } + }, + "Pipeline is not deployed": { + "value": { + "message": "Unable to interact with pipeline because the deployment status (stopped) indicates it is not (yet) fully provisioned pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionNotDeployed", + "details": { + "pipeline_name": "my_pipeline", + "status": "Stopped", + "desired_status": "Provisioned" + } + } + }, + "Response timeout": { + "value": { + "message": "Error sending HTTP request to pipeline: timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response) Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response)" + } + } + } + } + } + } + } + }, + "security": [ + { + "JSON web token (JWT) or API key": [] + } + ] + } + }, "/v0/pipelines/{pipeline_name}/start_transaction": { "post": { "tags": [ diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index 4d43e4eb995..44abb0783de 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -1566,6 +1566,13 @@ def rebalance(self): self.client.rebalance_pipeline(self.name) + def start_compaction(self): + """ + Initiate immediate compaction of the pipeline's state. + """ + + self.client.start_compaction_pipeline(self.name) + def generate_completion_token(self, table_name: str, connector_name: str) -> str: """ Returns a completion token that can be passed to :meth:`.Pipeline.completion_token_status` to diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index bc396cd49f2..119e25a1412 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -1470,6 +1470,9 @@ def get_cluster_event(self, event_id: str, selector: str = "status") -> dict: def rebalance_pipeline(self, pipeline_name: str): self.http.post(path=f"/pipelines/{pipeline_name}/rebalance") + def start_compaction_pipeline(self, pipeline_name: str): + self.http.post(path=f"/pipelines/{pipeline_name}/start_compaction") + def get_checkpoints(self, pipeline_name: str): return self.http.get(path=f"/pipelines/{pipeline_name}/checkpoints") diff --git a/python/tests/platform/test_shared_pipeline.py b/python/tests/platform/test_shared_pipeline.py index 38cc2c1484a..1f67a06d085 100644 --- a/python/tests/platform/test_shared_pipeline.py +++ b/python/tests/platform/test_shared_pipeline.py @@ -92,6 +92,17 @@ def test_get_pipeline_stats(self): assert stats.get("inputs") is not None assert stats.get("outputs") is not None + def test_start_compaction(self): + self.pipeline.start() + self.pipeline.input_json("tbl", [{"id": 1}, {"id": 2}]) + + before = list(self.pipeline.query("SELECT COUNT(*) AS num_rows FROM v0")) + self.pipeline.start_compaction() + self.pipeline.start_compaction() + after = list(self.pipeline.query("SELECT COUNT(*) AS num_rows FROM v0")) + + assert after == before + def test_case_sensitive_views_listen(self): self.pipeline.start_paused()