From 408e68117e89cbb8104371f76939c517e7a697aa Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Fri, 8 May 2026 17:01:17 -0700 Subject: [PATCH] [dbsp] Compaction API. Add a `/start_compaction` endpoint, which triggers background compaction of all traces in the circuit. Compaction starts by merging all batches at level 1 and pushing the resulting batch to level 2, regardless of its size, we then do the same at level 2, etc. Compaction does not stall the circuit. Compaction is useful for example to consolidate all negative weights in the pipeline. Signed-off-by: Leonid Ryzhyk --- crates/adapters/src/controller.rs | 22 ++ crates/adapters/src/server.rs | 7 + crates/dbsp/src/circuit/circuit_builder.rs | 66 ++++++ crates/dbsp/src/circuit/dbsp_handle.rs | 13 ++ crates/dbsp/src/circuit/metadata.rs | 9 +- crates/dbsp/src/circuit/operator_traits.rs | 5 + .../src/operator/dynamic/accumulate_trace.rs | 6 + .../operator/dynamic/multijoin/match_keys.rs | 2 + crates/dbsp/src/operator/dynamic/trace.rs | 204 +++++++++++++++++- crates/dbsp/src/trace.rs | 2 + crates/dbsp/src/trace/spine_async.rs | 174 ++++++++++++++- crates/dbsp/src/trace/test/test_batch.rs | 2 + crates/fda/src/cli.rs | 6 + crates/fda/src/main.rs | 14 ++ .../src/api/endpoints/pipeline_interaction.rs | 52 +++++ crates/pipeline-manager/src/api/main.rs | 2 + openapi.json | 114 ++++++++++ python/feldera/pipeline.py | 7 + python/feldera/rest/feldera_client.py | 3 + python/tests/platform/test_shared_pipeline.py | 11 + 20 files changed, 713 insertions(+), 8 deletions(-) diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 49ce170204b..0ca7059c2f4 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -470,6 +470,8 @@ pub type SyncCheckpointCallbackFn = Box) + Send>; +pub type StartCompactionCallbackFn = Box) + Send>; + /// A command that [Controller] can send to [Controller::circuit_thread]. /// /// There is no type for a command reply. Instead, the command implementation @@ -481,6 +483,7 @@ enum Command { Suspend(SuspendCallbackFn), SyncCheckpoint((uuid::Uuid, SyncCheckpointCallbackFn)), Rebalance(RebalanceCallbackFn), + StartCompaction(StartCompactionCallbackFn), } impl Command { @@ -496,6 +499,7 @@ impl Command { callback(Err(Arc::new(ControllerError::ControllerExit))) } Command::Rebalance(callback) => callback(Err(ControllerError::ControllerExit)), + Command::StartCompaction(callback) => callback(Err(ControllerError::ControllerExit)), } } } @@ -1989,6 +1993,19 @@ impl Controller { Ok(()) } + pub async fn start_compaction(&self) -> Result<(), ControllerError> { + let (sender, receiver) = oneshot::channel(); + self.inner + .send_command(Command::StartCompaction(Box::new(move |result| { + if sender.send(result).is_err() { + error!("`/start_compaction` result could not be sent"); + } + }))); + self.inner.circuit_thread_unparker.unpark(); + receiver.await.unwrap()?; + Ok(()) + } + /// Returns an object for monitoring the step that the controller has /// completed. pub fn step_watcher(&self) -> tokio::sync::watch::Receiver { @@ -3332,6 +3349,11 @@ impl CircuitThread { .rebalance() .map_err(ControllerError::dbsp_error), ), + Command::StartCompaction(reply_callback) => reply_callback( + self.circuit + .start_compaction() + .map_err(ControllerError::dbsp_error), + ), } } } diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 22027506c61..fb5ffc05dd9 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -1238,6 +1238,7 @@ where .service(reset_output_endpoint) .service(reset_status) .service(rebalance) + .service(start_compaction) .service(coordination_activate_handler) .service(coordination_status) .service(coordination_step_request) @@ -2444,6 +2445,12 @@ async fn rebalance(state: WebData) -> Result) -> Result { + state.controller()?.start_compaction().await?; + Ok(HttpResponse::Ok().into()) +} + #[post("/coordination/activate")] async fn coordination_activate_handler( state: WebData, diff --git a/crates/dbsp/src/circuit/circuit_builder.rs b/crates/dbsp/src/circuit/circuit_builder.rs index bb8895e23f1..619bb2d0a94 100644 --- a/crates/dbsp/src/circuit/circuit_builder.rs +++ b/crates/dbsp/src/circuit/circuit_builder.rs @@ -1098,6 +1098,9 @@ pub trait Node: Any { /// from a checkpoint must be backfilled from clean state. fn clear_state(&mut self) -> Result<(), DbspError>; + /// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates. + fn start_compaction(&mut self); + /// Place operator in the replay mode. /// /// In the replay mode the operator streams its stored state to a temporary @@ -1836,6 +1839,8 @@ pub trait CircuitBase: 'static { ) -> Result; fn rebalance(&self); + + fn start_compaction(&self); } /// The circuit interface. All DBSP computation takes place within a circuit. @@ -3506,6 +3511,13 @@ where fn rebalance(&self) { self.inner().balancer.rebalance() } + + fn start_compaction(&self) { + let _ = self.map_local_nodes_mut(&mut |node| { + node.start_compaction(); + Ok(()) + }); + } } impl Circuit for ChildCircuit @@ -4673,6 +4685,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -4820,6 +4836,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -4981,6 +5001,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5135,6 +5159,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5346,6 +5374,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5533,6 +5565,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5744,6 +5780,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -5929,6 +5969,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6135,6 +6179,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6326,6 +6374,10 @@ where self.operator.restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.clear_state() } @@ -6507,6 +6559,10 @@ where .restore(base, self.persistent_id().as_deref()) } + fn start_compaction(&mut self) { + self.operator.borrow_mut().start_compaction() + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.operator.borrow_mut().clear_state() } @@ -6672,6 +6728,8 @@ where Ok(()) } + fn start_compaction(&mut self) {} + fn clear_state(&mut self) -> Result<(), DbspError> { Ok(()) } @@ -6900,6 +6958,10 @@ where Ok(()) } + fn start_compaction(&mut self) { + self.circuit.start_compaction(); + } + fn clear_state(&mut self) -> Result<(), DbspError> { self.circuit .map_local_nodes_mut(&mut |node| node.clear_state()) @@ -7669,6 +7731,10 @@ impl CircuitHandle { pub fn rebalance(&self) { self.circuit.rebalance() } + + pub fn start_compaction(&self) { + self.circuit.start_compaction() + } } pin_project! { diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 048ffaea610..82daa97cea7 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -860,6 +860,12 @@ impl Runtime { return; } } + Ok(Command::StartCompaction) => { + circuit.start_compaction(); + if status_sender.send(Ok(Response::Unit)).is_err() { + return; + } + } // Nothing to do: do some housekeeping and relinquish the CPU if there's none // left. Err(TryRecvError::Empty) => { @@ -953,6 +959,7 @@ enum Command { GetCurrentBalancerPolicy(String), Rebalance, SetAutoRebalance(bool), + StartCompaction, } impl Debug for Command { @@ -994,6 +1001,7 @@ impl Debug for Command { Command::SetAutoRebalance(enable) => { f.debug_tuple("SetAutoRebalance").field(enable).finish() } + Command::StartCompaction => write!(f, "StartCompaction"), } } } @@ -1737,6 +1745,11 @@ impl DBSPHandle { self.broadcast_command(Command::Rebalance, |_, _| {})?; Ok(()) } + + pub fn start_compaction(&mut self) -> Result<(), DbspError> { + self.broadcast_command(Command::StartCompaction, |_, _| {})?; + Ok(()) + } } impl Drop for DBSPHandle { diff --git a/crates/dbsp/src/circuit/metadata.rs b/crates/dbsp/src/circuit/metadata.rs index d0c3001b97b..da46f24dd38 100644 --- a/crates/dbsp/src/circuit/metadata.rs +++ b/crates/dbsp/src/circuit/metadata.rs @@ -127,6 +127,7 @@ pub const MERGING_MEMORY_RECORDS_COUNT: MetricId = pub const MERGING_STORAGE_RECORDS_COUNT: MetricId = MetricId(Cow::Borrowed("merging_storage_records_count")); pub const COMPLETED_MERGES: MetricId = MetricId(Cow::Borrowed("completed_merges")); +pub const COMPACTION_STATE: MetricId = MetricId(Cow::Borrowed("compaction_state")); pub const NEGATIVE_WEIGHT_COUNT: MetricId = MetricId(Cow::Borrowed("negative_weight_count")); pub const BLOOM_FILTER_BITS_PER_KEY: MetricId = MetricId(Cow::Borrowed("bloom_filter_bits_per_key")); @@ -175,7 +176,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId = MetricId(Cow::Borrowed("input_integral_records_count")); -pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [ +pub const CIRCUIT_METRICS: [CircuitMetric; 75] = [ // State CircuitMetric { name: USED_MEMORY_BYTES, @@ -369,6 +370,12 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 74] = [ advanced: true, description: "Information about the batches that were compacted (merged).", }, + CircuitMetric { + name: COMPACTION_STATE, + category: CircuitMetricCategory::State, + advanced: true, + description: "State of the compaction process.", + }, // Inputs CircuitMetric { name: INPUT_RECORDS_COUNT, diff --git a/crates/dbsp/src/circuit/operator_traits.rs b/crates/dbsp/src/circuit/operator_traits.rs index 8036fb423ea..da57e96ef9e 100644 --- a/crates/dbsp/src/circuit/operator_traits.rs +++ b/crates/dbsp/src/circuit/operator_traits.rs @@ -305,6 +305,11 @@ pub trait Operator: 'static { fn flush_progress(&self) -> Option { None } + + /// Start compaction of the operator's state. + /// + /// Only defined for operators that support compaction. No-op for all other operators. + fn start_compaction(&mut self) {} } /// A source operator that injects data from the outside world or from the diff --git a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs index a89afef8956..829be69c111 100644 --- a/crates/dbsp/src/operator/dynamic/accumulate_trace.rs +++ b/crates/dbsp/src/operator/dynamic/accumulate_trace.rs @@ -1101,6 +1101,12 @@ where fn is_flush_complete(&self) -> bool { !self.flush_output } + + fn start_compaction(&mut self) { + if let Some(trace) = self.trace.as_mut() { + trace.initiate_compaction() + } + } } impl StrictOperator for AccumulateZ1Trace diff --git a/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs b/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs index 7f51d843f1c..ce2414876de 100644 --- a/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs +++ b/crates/dbsp/src/operator/dynamic/multijoin/match_keys.rs @@ -862,6 +862,8 @@ where Ok(()) } + fn start_compaction(&mut self) {} + fn eval<'a>( &'a mut self, ) -> Pin, SchedulerError>> + 'a>> { diff --git a/crates/dbsp/src/operator/dynamic/trace.rs b/crates/dbsp/src/operator/dynamic/trace.rs index 43a83d4eb32..2d90686a3f2 100644 --- a/crates/dbsp/src/operator/dynamic/trace.rs +++ b/crates/dbsp/src/operator/dynamic/trace.rs @@ -1168,6 +1168,12 @@ where fn is_flush_complete(&self) -> bool { !self.flush_output } + + fn start_compaction(&mut self) { + if let Some(trace) = self.trace.as_mut() { + trace.initiate_compaction() + } + } } impl StrictOperator for Z1Trace @@ -1303,12 +1309,18 @@ mod test { use std::{ cmp::max, collections::{BTreeMap, BTreeSet}, + thread, + time::{Duration, Instant}, }; use crate::{ OrdIndexedZSet, Runtime, Stream, TypedBox, ZWeight, + circuit::{ + CircuitConfig, GlobalNodeId, + metadata::{MetaItem, SPINE_BATCHES_COUNT}, + }, dynamic::DynData, - typed_batch::{self, Spine}, + typed_batch::{self, IndexedZSetReader, Spine}, utils::Tup2, }; use proptest::{collection::vec, prelude::*}; @@ -1643,6 +1655,196 @@ mod test { test_integrate_trace_retain(batches, 10, 10); } + fn spine_batches_count_values(profile: &crate::profile::DbspProfile) -> Vec<(String, usize)> { + let root = GlobalNodeId::root(); + let operator_names = spine_operator_names(profile); + + // The root profile entry merges child metrics, so only inspect individual operators. + let mut counts = profile + .worker_profiles + .iter() + .flat_map(|profile| profile.attribute_profile(&SPINE_BATCHES_COUNT)) + .filter_map(|(node_id, value)| { + (node_id != root).then(|| match value { + MetaItem::Count(count) => { + let operator_name = operator_names + .get(&node_id.node_identifier()) + .cloned() + .unwrap_or_else(|| node_id.to_string()); + (operator_name, count) + } + value => panic!("spine_batches_count must be serialized as a count: {value:?}"), + }) + }) + .collect::>(); + + counts.sort(); + counts + } + + fn spine_operator_names(profile: &crate::profile::DbspProfile) -> BTreeMap { + let mut operator_names = BTreeMap::new(); + + if let Some(graph) = &profile.graph { + let graph_json: serde_json::Value = serde_json::from_str(&graph.to_json()).unwrap(); + collect_operator_names(&graph_json, &mut operator_names); + } + + operator_names + } + + fn collect_operator_names( + value: &serde_json::Value, + operator_names: &mut BTreeMap, + ) { + match value { + serde_json::Value::Object(object) => { + if let (Some(id), Some(label)) = ( + object.get("id").and_then(serde_json::Value::as_str), + object.get("label").and_then(serde_json::Value::as_str), + ) { + let name = label + .split("\\l") + .next() + .unwrap_or(label) + .split(" @ ") + .next() + .unwrap_or(label); + operator_names.insert(id.to_owned(), name.to_owned()); + } + + for value in object.values() { + collect_operator_names(value, operator_names); + } + } + serde_json::Value::Array(array) => { + for value in array { + collect_operator_names(value, operator_names); + } + } + _ => {} + } + } + + #[test] + fn test_start_compaction_accumulate_integrate_trace() { + const NUM_TRACES: usize = 3; + const ITERATIONS: usize = 3; + const BATCHES_PER_ITERATION: usize = 20; + const TOTAL_BATCHES: usize = BATCHES_PER_ITERATION * ITERATIONS * (ITERATIONS + 1) / 2; + const RECORDS_PER_BATCH: usize = 2000; + + let storage_dir = tempfile::tempdir().unwrap(); + let circuit_config = + CircuitConfig::with_workers(2).with_temporary_storage(storage_dir.path()); + let (mut dbsp, (input_handles, trace_outputs)) = + Runtime::init_circuit(circuit_config, move |circuit| { + // Build several independent trace integrals so one compaction request has to visit + // multiple `accumulate_integrate_trace` operators in a storage-enabled circuit. + let mut input_handles = Vec::new(); + let mut trace_outputs = Vec::new(); + + for _ in 0..NUM_TRACES { + let (stream, handle) = circuit.add_input_indexed_zset::(); + let trace = stream.shard().accumulate_integrate_trace(); + let output = trace + .apply(|trace| trace.ro_snapshot().consolidate()) + .output(); + input_handles.push(handle); + trace_outputs.push(output); + } + + Ok((input_handles, trace_outputs)) + }) + .unwrap(); + + let mut next_batch = 0; + for iteration in 0..ITERATIONS { + // Each iteration appends more batches than the previous one. This exercises + // repeated compaction on traces that keep receiving additional input. + let batches_this_iteration = (iteration + 1) * BATCHES_PER_ITERATION; + let end_batch = next_batch + batches_this_iteration; + + for batch in next_batch..end_batch { + for (trace, input_handle) in input_handles.iter().enumerate() { + let mut tuples = Vec::with_capacity(RECORDS_PER_BATCH); + for record in 0..RECORDS_PER_BATCH { + // Use disjoint key ranges per trace so the output validation can + // reconstruct the expected integral exactly. + let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH + + batch * RECORDS_PER_BATCH + + record) as i32; + tuples.push(Tup2(key, Tup2(record as i32, 1))); + } + + input_handle.append(&mut tuples); + } + + dbsp.transaction().unwrap(); + } + + next_batch = end_batch; + + let profile = dbsp.retrieve_profile().unwrap(); + let counts = spine_batches_count_values(&profile); + println!( + "SPINE_BATCHES_COUNT values before compaction iteration {iteration}: {counts:?}" + ); + assert!( + !counts.is_empty(), + "profile should contain at least one SPINE_BATCHES_COUNT metric" + ); + + dbsp.start_compaction().unwrap(); + thread::sleep(Duration::from_millis(100)); + // Starting compaction again while the previous request is still making progress + // should be idempotent. + dbsp.start_compaction().unwrap(); + + let deadline = Instant::now() + Duration::from_secs(60); + loop { + let profile = dbsp.retrieve_profile().unwrap(); + let counts = spine_batches_count_values(&profile); + println!( + "SPINE_BATCHES_COUNT values after compaction request iteration {iteration}: {counts:?}" + ); + assert!( + !counts.is_empty(), + "profile should contain at least one SPINE_BATCHES_COUNT metric" + ); + + if counts.iter().all(|(_, count)| *count <= 1) { + break; + } + + assert!( + Instant::now() < deadline, + "timed out waiting for all SPINE_BATCHES_COUNT values to be <= 1: {counts:?}" + ); + thread::sleep(Duration::from_secs(5)); + } + + // After all spines report at most one batch, run one more logical step to + // materialize the trace outputs and verify compaction preserved every record. + dbsp.transaction().unwrap(); + for (trace, output) in trace_outputs.iter().enumerate() { + let output = output.consolidate(); + let mut actual_tuples = output.iter(); + + for batch in 0..next_batch { + for record in 0..RECORDS_PER_BATCH { + let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH + + batch * RECORDS_PER_BATCH + + record) as i32; + assert_eq!(actual_tuples.next(), Some((key, record as i32, 1))); + } + } + + assert_eq!(actual_tuples.next(), None); + } + } + } + proptest! { #![proptest_config(ProptestConfig::with_cases(16))] diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 607b6ced249..a397c473325 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -313,6 +313,8 @@ pub trait Trace: BatchReader { /// Allows the trace to report additional metadata. fn metadata(&self, _meta: &mut OperatorMeta) {} + + fn initiate_compaction(&self); } /// Where a batch is stored. diff --git a/crates/dbsp/src/trace/spine_async.rs b/crates/dbsp/src/trace/spine_async.rs index 7c46fc5afa7..4bf73ed7f94 100644 --- a/crates/dbsp/src/trace/spine_async.rs +++ b/crates/dbsp/src/trace/spine_async.rs @@ -12,7 +12,7 @@ use crate::{ max_level0_batch_size_records, metadata::{ BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_HIT_RATE_PERCENT, BLOOM_FILTER_HITS_COUNT, - BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPLETED_MERGES, + BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPACTION_STATE, COMPLETED_MERGES, LOOSE_BATCHES_COUNT, LOOSE_MEMORY_RECORDS_COUNT, LOOSE_STORAGE_RECORDS_COUNT, MERGE_BACKPRESSURE_WAIT_TIME_SECONDS, MERGE_REDUCTION_PERCENT, MERGING_BATCHES_COUNT, MERGING_MEMORY_RECORDS_COUNT, MERGING_SIZE_BYTES, MERGING_STORAGE_RECORDS_COUNT, @@ -134,6 +134,53 @@ impl From<(Vec, &Spine)> for CommittedSpine { } } +/// State of the compaction process in a slot. +/// +/// ```text +/// │ +/// ▼ +/// ┌──────┐ ┌─────────┐ +/// │ none ├───────────────────────────►│requested│ +/// └──────┘ └────┬────┘ +/// ▲ │ +/// │ │ +/// │ │ +/// │ │ +/// │ ┌───────────┐ │ +/// └─────────┤in progress│◄──────────────┘ +/// └───────────┘ +/// ``` +/// +/// * none -> requested: +/// - triggered by the `initiate_compaction` method in the first non-empty slot of the spine. +/// - triggered when compaction completes in the previous slot. +/// * requested -> in progress: +/// - triggered when the slot finishes processing any ongoing merge and discovers that +/// compaction status has been set to requested. It starts a new merge including _all_ +/// batches at the current level. +/// * in progress -> none: +/// - the merge completes; the resulting batch is pushed to the next slot regardless of +/// its size; compaction is initiated at the next slot. +/// * requested -> none: +/// - there is <=1 batches in the current slot. Compaction completes instantly and the +/// batch is pushed to the next slot; compaction is initiated at the next slot. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CompactionStatus { + None, + Requested, + InProgress, +} + +impl Display for CompactionStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompactionStatus::None => write!(f, "none"), + CompactionStatus::Requested => write!(f, "requested"), + CompactionStatus::InProgress => write!(f, "in progress"), + } + } +} + /// A group of batches with similar sizes (as determined by [size_from_level]). #[derive(Clone, SizeOf)] struct Slot @@ -166,6 +213,10 @@ where /// Wake up the task that handles merges at this level. #[size_of(skip)] notify: Arc, + + /// State of the compaction process in this slot. + #[size_of(skip)] + compaction_status: CompactionStatus, } impl Default for Slot @@ -181,6 +232,7 @@ where n_merged_batches: 0, n_steps: 0, notify: Arc::new(Notify::new()), + compaction_status: CompactionStatus::None, } } } @@ -216,12 +268,23 @@ where let merge_counts = &MERGE_COUNTS[level]; // Start a merge if there is no ongoing merge and there are either enough loose batches to start a merge, - // or we are under high memory pressure and there's at least one in-memory batch in this slot. + // or we are under high memory pressure and there's at least one in-memory batch in this slot, or + // compaction has been requested and there are more than one batches in the slot. if self.merging_batches.is_none() && (self.loose_batches.len() >= *merge_counts.start() - || self.must_relieve_memory_pressure()) + || self.must_relieve_memory_pressure() + || (self.compaction_status == CompactionStatus::Requested + && self.loose_batches.len() > 1)) { - let n = std::cmp::min(*merge_counts.end(), self.loose_batches.len()); + // Compaction requested - merge all batches in the slot. + let max_batches = if self.compaction_status == CompactionStatus::Requested { + self.compaction_status = CompactionStatus::InProgress; + usize::MAX + } else { + *merge_counts.end() + }; + + let n = std::cmp::min(max_batches, self.loose_batches.len()); let batches = self.loose_batches.drain(..n).collect::>(); self.merging_batches = Some(batches.clone()); Some(batches) @@ -408,7 +471,28 @@ where ) }) .record(); - if !new_batch.is_empty() { + + if slot.compaction_status == CompactionStatus::InProgress { + // We finished merging all batches in the slot as part of compaction. + slot.compaction_status = CompactionStatus::None; + + // If there are more non-empty slots above, push compaction results to the next slot + // and initiate compaction there; otherwise, compaction is finished for the spine. + if let Some(last_level) = self.last_non_empty_slot() + && last_level > level + { + self.initiate_compaction_at_level( + level + 1, + if new_batch.is_empty() { + vec![] + } else { + vec![new_batch] + }, + ); + } else if !new_batch.is_empty() { + self.add_batch(new_batch, new_level); + } + } else if !new_batch.is_empty() { self.add_batch(new_batch, new_level); } } @@ -421,6 +505,49 @@ where fn metadata_snapshot(&self) -> ([Slot; MAX_LEVELS], SpineStats) { (self.slots.clone(), self.spine_stats.clone()) } + + /// Non-empty slot with the smallest index. + fn first_non_empty_slot(&self) -> Option { + for (i, slot) in self.slots.iter().enumerate() { + if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() { + return Some(i); + } + } + None + } + + /// Non-empty slot with the largest index. + fn last_non_empty_slot(&self) -> Option { + for (i, slot) in self.slots.iter().enumerate().rev() { + if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() { + return Some(i); + } + } + None + } + + fn initiate_compaction(&mut self) { + let Some(level) = self.first_non_empty_slot() else { + return; + }; + + self.initiate_compaction_at_level(level, Vec::new()); + } + + /// Push batches to the slot at the given level and initiate compaction. + fn initiate_compaction_at_level(&mut self, level: usize, batches: Vec>) { + let slot = &mut self.slots[level]; + slot.loose_batches.extend(batches); + + // Note: if compaction is already in progress in this slot, this means that the + // user triggered a new compaction run before the previous one completed. In this + // case, we reset the state back to Requested, which will cause the slot to start a new + // round of merging right after the current one before pushing the results to the next + // slot. + // Effectively, we are merging the two compaction sweeps into one. + slot.compaction_status = CompactionStatus::Requested; + slot.notify.notify_one(); + } } /// A fully asynchronous merger. @@ -709,6 +836,11 @@ where ])), )]); } + meta.extend([MetricReading::new( + COMPACTION_STATE, + vec![(Cow::Borrowed("slot"), index.to_string().into())], + MetaItem::String(slot.compaction_status.to_string()), + )]); let mut negative_weight_count = 0; let mut has_negative_weight_counts = false; @@ -883,6 +1015,11 @@ where cache_stats.metadata(meta); } + + fn initiate_compaction(&self) { + let mut state = self.state.lock().unwrap(); + state.initiate_compaction(); + } } impl Drop for AsyncMerger @@ -1059,7 +1196,28 @@ where // Figuring out what merges to start requires the lock. Then we drop // the lock to actually start them, in case that's expensive (it // might require creating a file, for example). - let start_merge = self.state.lock().unwrap().slots[level].try_start_merge(level); + let start_merge = { + let mut state = self.state.lock().unwrap(); + let last_non_empty_slot = state.last_non_empty_slot(); + let slot = &mut state.slots[level]; + let start_merge = slot.try_start_merge(level); + + if slot.compaction_status == CompactionStatus::Requested + && slot.merging_batches.is_none() + { + slot.compaction_status = CompactionStatus::None; + + if let Some(last_level) = last_non_empty_slot + && last_level > level + { + let batches = slot.loose_batches.drain(..).collect::>(); + + state.initiate_compaction_at_level(level + 1, batches); + } + } + + start_merge + }; let snapshot = if value_filter .as_ref() @@ -1817,6 +1975,10 @@ where fn metadata(&self, meta: &mut OperatorMeta) { self.merger.metadata(meta); } + + fn initiate_compaction(&self) { + self.merger.initiate_compaction(); + } } impl Spine diff --git a/crates/dbsp/src/trace/test/test_batch.rs b/crates/dbsp/src/trace/test/test_batch.rs index 3513fdced47..df037f926bc 100644 --- a/crates/dbsp/src/trace/test/test_batch.rs +++ b/crates/dbsp/src/trace/test/test_batch.rs @@ -1393,6 +1393,8 @@ where ) -> Result<(), crate::Error> { todo!() } + + fn initiate_compaction(&self) {} } /// Test random sampling methods. diff --git a/crates/fda/src/cli.rs b/crates/fda/src/cli.rs index 7fabd08fa82..215973c7116 100644 --- a/crates/fda/src/cli.rs +++ b/crates/fda/src/cli.rs @@ -703,6 +703,12 @@ pub enum PipelineAction { #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] name: String, }, + /// Initiate compaction. + StartCompaction { + /// The name of the pipeline. + #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] + name: String, + }, /// Clear the storage resources of a pipeline. /// /// Note that the pipeline must be stopped before clearing its storage resources. diff --git a/crates/fda/src/main.rs b/crates/fda/src/main.rs index b46ec2b7ce6..28a909342d0 100644 --- a/crates/fda/src/main.rs +++ b/crates/fda/src/main.rs @@ -1910,6 +1910,20 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .unwrap(); println!("Initiated rebalancing for pipeline {name}."); } + PipelineAction::StartCompaction { name } => { + client + .post_pipeline_start_compaction() + .pipeline_name(name.clone()) + .send() + .await + .map_err(handle_errors_fatal( + client.baseurl().clone(), + "Failed to initiate compaction", + 1, + )) + .unwrap(); + println!("Initiated compaction for pipeline {name}."); + } PipelineAction::Bench { args } => bench::bench(client, format, args).await, PipelineAction::DismissError { name } => { let response = client diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index 10b0d62c401..b7069c185d3 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -1145,6 +1145,58 @@ pub(crate) async fn post_pipeline_rebalance( .await } +/// Initiate compaction. +/// +/// Initiate immediate compaction of the pipeline's state. +#[utoipa::path( + context_path = "/v0", + security(("JSON web token (JWT) or API key" = [])), + params( + ("pipeline_name" = String, Path, description = "Unique pipeline name"), + ), + responses( + (status = OK + , description = "Compaction started successfully"), + (status = NOT_FOUND + , description = "Pipeline with that name does not exist" + , body = ErrorResponse + , example = json!(examples::error_unknown_pipeline_name())), + (status = SERVICE_UNAVAILABLE + , body = ErrorResponse + , examples( + ("Pipeline is not deployed" = (value = json!(examples::error_pipeline_interaction_not_deployed()))), + ("Pipeline is currently unavailable" = (value = json!(examples::error_pipeline_interaction_currently_unavailable()))), + ("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))), + ("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout()))) + ) + ), + (status = INTERNAL_SERVER_ERROR, body = ErrorResponse), + ), + tag = "Pipeline Lifecycle" +)] +#[post("/pipelines/{pipeline_name}/start_compaction")] +pub(crate) async fn post_pipeline_start_compaction( + state: WebData, + client: WebData, + tenant_id: ReqData, + path: web::Path, +) -> Result { + let pipeline_name = path.into_inner(); + state + .runner + .forward_http_request_to_pipeline_by_name( + client.as_ref(), + *tenant_id, + &pipeline_name, + Method::POST, + "start_compaction", + "", + Some(Duration::from_secs(120)), + None, + ) + .await +} + /// Sync Checkpoints To S3 /// /// Syncs latest checkpoints to the object store configured in pipeline config. diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index d9346a3a520..c5db6563efd 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -234,6 +234,7 @@ It contains the following fields: endpoints::pipeline_interaction::get_pipeline_time_series, endpoints::pipeline_interaction::get_pipeline_time_series_stream, endpoints::pipeline_interaction::post_pipeline_rebalance, + endpoints::pipeline_interaction::post_pipeline_start_compaction, // API keys endpoints::api_key::list_api_keys, @@ -674,6 +675,7 @@ fn api_scope() -> Scope { .service(endpoints::pipeline_interaction::start_samply_profile) .service(endpoints::pipeline_interaction::support_bundle::get_pipeline_support_bundle) .service(endpoints::pipeline_interaction::post_pipeline_rebalance) + .service(endpoints::pipeline_interaction::post_pipeline_start_compaction) .service(endpoints::pipeline_interaction::pipeline_adhoc_sql) .service(endpoints::pipeline_interaction::completion_token) .service(endpoints::pipeline_interaction::completion_status) diff --git a/openapi.json b/openapi.json index 07689adbce3..2b8eab57d55 100644 --- a/openapi.json +++ b/openapi.json @@ -5156,6 +5156,120 @@ ] } }, + "/v0/pipelines/{pipeline_name}/start_compaction": { + "post": { + "tags": [ + "Pipeline Lifecycle" + ], + "summary": "Initiate compaction.", + "description": "Initiate immediate compaction of the pipeline's state.", + "operationId": "post_pipeline_start_compaction", + "parameters": [ + { + "name": "pipeline_name", + "in": "path", + "description": "Unique pipeline name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Compaction started successfully" + }, + "404": { + "description": "Pipeline with that name does not exist", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "example": { + "message": "Unknown pipeline name 'non-existent-pipeline'", + "error_code": "UnknownPipelineName", + "details": { + "pipeline_name": "non-existent-pipeline" + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "examples": { + "Disconnected during response": { + "value": { + "message": "Error sending HTTP request to pipeline: the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs. Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs." + } + } + }, + "Pipeline is currently unavailable": { + "value": { + "message": "Error sending HTTP request to pipeline: deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again" + } + } + }, + "Pipeline is not deployed": { + "value": { + "message": "Unable to interact with pipeline because the deployment status (stopped) indicates it is not (yet) fully provisioned pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionNotDeployed", + "details": { + "pipeline_name": "my_pipeline", + "status": "Stopped", + "desired_status": "Provisioned" + } + } + }, + "Response timeout": { + "value": { + "message": "Error sending HTTP request to pipeline: timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response) Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response)" + } + } + } + } + } + } + } + }, + "security": [ + { + "JSON web token (JWT) or API key": [] + } + ] + } + }, "/v0/pipelines/{pipeline_name}/start_transaction": { "post": { "tags": [ diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index 4d43e4eb995..44abb0783de 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -1566,6 +1566,13 @@ def rebalance(self): self.client.rebalance_pipeline(self.name) + def start_compaction(self): + """ + Initiate immediate compaction of the pipeline's state. + """ + + self.client.start_compaction_pipeline(self.name) + def generate_completion_token(self, table_name: str, connector_name: str) -> str: """ Returns a completion token that can be passed to :meth:`.Pipeline.completion_token_status` to diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index bc396cd49f2..119e25a1412 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -1470,6 +1470,9 @@ def get_cluster_event(self, event_id: str, selector: str = "status") -> dict: def rebalance_pipeline(self, pipeline_name: str): self.http.post(path=f"/pipelines/{pipeline_name}/rebalance") + def start_compaction_pipeline(self, pipeline_name: str): + self.http.post(path=f"/pipelines/{pipeline_name}/start_compaction") + def get_checkpoints(self, pipeline_name: str): return self.http.get(path=f"/pipelines/{pipeline_name}/checkpoints") diff --git a/python/tests/platform/test_shared_pipeline.py b/python/tests/platform/test_shared_pipeline.py index 38cc2c1484a..1f67a06d085 100644 --- a/python/tests/platform/test_shared_pipeline.py +++ b/python/tests/platform/test_shared_pipeline.py @@ -92,6 +92,17 @@ def test_get_pipeline_stats(self): assert stats.get("inputs") is not None assert stats.get("outputs") is not None + def test_start_compaction(self): + self.pipeline.start() + self.pipeline.input_json("tbl", [{"id": 1}, {"id": 2}]) + + before = list(self.pipeline.query("SELECT COUNT(*) AS num_rows FROM v0")) + self.pipeline.start_compaction() + self.pipeline.start_compaction() + after = list(self.pipeline.query("SELECT COUNT(*) AS num_rows FROM v0")) + + assert after == before + def test_case_sensitive_views_listen(self): self.pipeline.start_paused()