[dbsp] Compaction API.#6212
Conversation
Add a `/start_compaction` endpoint, which triggers background compaction of all traces in the circuit. Compaction starts by merging all batches at level 1 and pushing the resulting batch to level 2, regardless of its size, we then do the same at level 2, etc. Compaction does not stall the circuit. Compaction is useful for example to consolidate all negative weights in the pipeline. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
| if let Some(last_level) = self.last_non_empty_slot() | ||
| && last_level > level | ||
| { | ||
| self.initiate_compaction_at_level( | ||
| level + 1, | ||
| if new_batch.is_empty() { | ||
| vec![] | ||
| } else { | ||
| vec![new_batch] | ||
| }, | ||
| ); |
There was a problem hiding this comment.
I think there is a corner case here where last_level > level but level + 1 is empty before we add the new batch. (Even if it's nonempty after we add the new batch, that's not an interesting merge because it would be a merge of one batch.)
This corner case is unlikely in steady state but it is likely soon after a previous compaction that produces a single level-3 or level-4 batch followed by a bunch of level-0 batches coming in from input batches.
I think we'll need some kind of iteration here that goes up a level accumulating batches until there are at least two batches or we hit the last populated level.
There was a problem hiding this comment.
Hmm, after reading the rest of the code I'm just confused about how these corner cases end up working.
There was a problem hiding this comment.
@blp , I think this is ok. We may indeed end up with 0 or 1 batches at the next level, which will cause the merger thread to instantly push these batches to the next level again as it handles the compaction request. It's a tiny bit less efficient, but why would it cause problems?
| "", | ||
| Some(Duration::from_secs(120)), | ||
| None, | ||
| ) |
There was a problem hiding this comment.
New user-visible API endpoint (/pipelines/{pipeline_name}/start_compaction) with no documentation. The Documentation checkbox in the PR checklist is unchecked. Please add a docs page (or a section under the existing operations/compaction docs) covering: what compaction does, when to use it, the endpoint, and the fda pipeline start-compaction CLI command.
| for record in 0..RECORDS_PER_BATCH { | ||
| // Use disjoint key ranges per trace so the output validation can | ||
| // reconstruct the expected integral exactly. | ||
| let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH |
There was a problem hiding this comment.
The thread::sleep(Duration::from_millis(100)) between the two start_compaction() calls is a potential flakiness source. On a heavily loaded CI runner, 100ms may not be enough for any background merge work to start, making the idempotency check vacuous. Consider polling a profile metric (e.g. COMPACTION_STATE) to confirm compaction is actually in progress before issuing the second call, or document why 100ms is sufficient.
Add a
/start_compactionendpoint, which triggers background compaction of all traces in the circuit. Compaction starts by merging all batches at level 1 and pushing the resulting batch to level 2, regardless of its size, we then do the same at level 2, etc. Compaction does not stall the circuit.Compaction is useful for example to consolidate all negative weights in the pipeline.
Describe Manual Test Plan
Customer reports that compaction improved the performance of a pipeline with many negative weights.
Checklist