Skip to content

[dbsp] Compaction API.#6212

Open
ryzhyk wants to merge 1 commit into
mainfrom
compaction-main
Open

[dbsp] Compaction API.#6212
ryzhyk wants to merge 1 commit into
mainfrom
compaction-main

Conversation

@ryzhyk
Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk commented May 10, 2026

Add a /start_compaction endpoint, which triggers background compaction of all traces in the circuit. Compaction starts by merging all batches at level 1 and pushing the resulting batch to level 2, regardless of its size, we then do the same at level 2, etc. Compaction does not stall the circuit.

Compaction is useful for example to consolidate all negative weights in the pipeline.

Describe Manual Test Plan

Customer reports that compaction improved the performance of a pipeline with many negative weights.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Add a `/start_compaction` endpoint, which triggers background compaction of all
traces in the circuit. Compaction starts by merging all batches at level 1 and
pushing the resulting batch to level 2, regardless of its size, we then do the
same at level 2, etc. Compaction does not stall the circuit.

Compaction is useful for example to consolidate all negative weights in the
pipeline.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk requested a review from blp May 10, 2026 22:00
@ryzhyk ryzhyk added the DBSP core Related to the core DBSP library label May 10, 2026
Comment on lines +481 to +491
if let Some(last_level) = self.last_non_empty_slot()
&& last_level > level
{
self.initiate_compaction_at_level(
level + 1,
if new_batch.is_empty() {
vec![]
} else {
vec![new_batch]
},
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a corner case here where last_level > level but level + 1 is empty before we add the new batch. (Even if it's nonempty after we add the new batch, that's not an interesting merge because it would be a merge of one batch.)
This corner case is unlikely in steady state but it is likely soon after a previous compaction that produces a single level-3 or level-4 batch followed by a bunch of level-0 batches coming in from input batches.
I think we'll need some kind of iteration here that goes up a level accumulating batches until there are at least two batches or we hit the last populated level.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after reading the rest of the code I'm just confused about how these corner cases end up working.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blp , I think this is ok. We may indeed end up with 0 or 1 batches at the next level, which will cause the merger thread to instantly push these batches to the next level again as it handles the compaction request. It's a tiny bit less efficient, but why would it cause problems?

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blockers, see inline.

"",
Some(Duration::from_secs(120)),
None,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New user-visible API endpoint (/pipelines/{pipeline_name}/start_compaction) with no documentation. The Documentation checkbox in the PR checklist is unchecked. Please add a docs page (or a section under the existing operations/compaction docs) covering: what compaction does, when to use it, the endpoint, and the fda pipeline start-compaction CLI command.

for record in 0..RECORDS_PER_BATCH {
// Use disjoint key ranges per trace so the output validation can
// reconstruct the expected integral exactly.
let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread::sleep(Duration::from_millis(100)) between the two start_compaction() calls is a potential flakiness source. On a heavily loaded CI runner, 100ms may not be enough for any background merge work to start, making the idempotency check vacuous. Consider polling a profile metric (e.g. COMPACTION_STATE) to confirm compaction is actually in progress before issuing the second call, or document why 100ms is sufficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants