Skip to content

Concurrent bootstrapping.#6502

Open
ryzhyk wants to merge 23 commits into
mainfrom
concurrent_bootstrap
Open

Concurrent bootstrapping.#6502
ryzhyk wants to merge 23 commits into
mainfrom
concurrent_bootstrap

Conversation

@ryzhyk

@ryzhyk ryzhyk commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Concurrent bootstrapping keeps old views receiving incremental updates while new
and modified views are bootstrapped. The circuit is cloned into two copies: copy 1
runs the old operators as usual; copy 2 bootstraps the new parts. Copy 1 also
records changes to the boundary streams that cross between the copies; those
records bring copy 2 up to date in the final stage. The operator sets activated in
the two copies may overlap.

Most of the code was generated by Claude with lots of guidance, reviewing, and
thorough test coverage.

Describe Manual Test Plan

This change should not affect existing functionality. The plan is to merge it and
then test/benchmark on QA pipelines + customer workloads.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@ryzhyk ryzhyk added DBSP core Related to the core DBSP library Pipeline manager Pipeline manager (API, API server, runner, compiler server) connectors Issues related to the adapters/connectors crate CLI Feldera CLI (fda) - related issues python Pull requests that update python code labels Jun 18, 2026
ryzhyk added 22 commits June 18, 2026 13:05
Design for concurrent bootstrapping: keep old views serving incremental
updates while new and modified views backfill in a second copy of the
circuit, then atomically cut over.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
fork() creates a logical copy of a trace in O(#batches): the copy shares
the source's immutable Arc'd batches, inheriting its name, compaction
frontier, retention filters, and dirty flag.

Because batch Arcs can now outlive their spine, Spine::consolidate()
deep-copies shared batches (Arc::unwrap_or_clone) instead of panicking,
and Spine::restore() clears the merger's filter copies so the spine-local
filter fields never desynchronize from the locked state that merges
actually enforce.

Concurrent bootstrapping will use fork() to seed a second circuit copy with
the live circuit's integrals.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Factor out common logic shared by Z1Trace and AccumulateZ1Trac.

Behavior-identical; the synchronization replay of concurrent
bootstrapping reuses the same helper instead of adding a third copy.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The recorder is a gated sink that accumulates the deltas flowing on a
stream across many transactions.  During concurrent bootstrapping, the
deltas applied to a boundary stream while the bootstrap circuit replays
its (frozen) integral must be captured for the later synchronization
transaction; the recorder is the operator that captures them.

The recorder consumes its stream with the new
OwnershipPreference::YIELD_OWNERSHIP, and the scheduler's ownership
constraints order such consumers before owned-input consumers.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Three new Operator methods:

* swap_state(&mut Self) exchanges an operator's persistent state with
  another instance of the same type.  The cutover phase of a concurrent
  bootstrap uses it to install state computed by the bootstrap circuit
  into the corresponding operator of the live circuit.

* supports_state_transfer() reports whether swap_state works, so a
  concurrent bootstrap can refuse up front instead of failing at
  cutover after the backfill is done.

* start_sync_replay() makes a replay source replay a
  caller-supplied trace -- the changes a recorder captured on a
  boundary stream -- instead of its own state, reusing the chunked
  replay drain..

Adds RuntimeError::BootstrapCircuit for unsupported-transfer errors.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Concurrent bootstrapping backfills new operators in a separate circuit
copy while the main circuit keeps serving. Each worker thread
now holds an optional second CircuitHandle, built by re-running a
the circuit constructor.

The bootstrap circuit is driven in lockstep on all workers through new
broadcast commands (create / start / commit / step / destroy), keeping
the invariant that every worker executes the same command sequence.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
…backfills

The new restore_concurrent() function prepares the live half of a concurrent
bootstrap: it restores operator state, arms a recorder on every boundary
stream, and schedules the operators.

DBSPHandle::start_concurrent_bootstrap orchestrates: concurrent restore
on the main circuit, bootstrap-circuit creation restored from the same
checkpoint, and the background backfill transaction.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The two phases that complete a concurrent bootstrap once the background
backfill commits:

- Synchronization replays the changes that the main circuit recorded on
  the boundary streams into the bootstrap circuit. One lockstep command
  drains the recorders, hands each spine to the matching replay source,
  and opens the synchronization transaction; the caller pumps
  step_bootstrap_circuit until the commit completes.

- Cutover installs the bootstrapped state into the main circuit,
  reactivates the full schedule, and destroys the bootstrap circuit.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
…ghted count, topk_asc

Add add_input_set_persistent(), aggregate_linear_persistent(), and
weighted_count_persistent(), following the existing *_persistent
convention; the unnamed variants now delegate to them.

Found by the concurrent-bootstrap topology tests, which checkpoint
hand-written circuits using all three operators.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Generic test harness for concurrent bootstrapping + tests for a range of
topologies covering all operators.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
A balanced (rebalancing) join in the bootstrap region used to force a
fallback to stop-the-world bootstrap, because a balanced join's state
is partitioned across workers by a cluster-wide `PartitioningPolicy` that
neither circuit copy's balancer reproduces in the other.  Balanced join
is becoming the default join, so the fallback is no longer acceptable.

The key to the fix is to make sure that the primary and bootstrapping
circuits don't end up with incompatible balancing policies during bootstrapping.
We avoid it by freezing the balancing policies of join clusters shared
between the two circuits.

Design: the "Balanced joins" section of docs/design/concurrent_bootstrapping.md.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Four complex balanced-join topologies, each stressing a different way a
balanced cluster interacts with the bootstrap boundary:

- chain cluster: J12, J23, J34 fuse s1..s4 into one cluster; a new closing
  join J14 adds a constraint to that boundary-spanning cluster.
- stacked joins: a new balanced join over the outputs of two kept balanced
  joins, which pulls both kept clusters across the boundary (re-evaluated in
  the bootstrap copy) and freezes them.
- balanced left join: a new balanced LEFT join sharing its left stream with
  a kept inner balanced join, with an oversized left table to exercise the
  unmatched-left leg.
- shared cluster under heavy skew: the boundary-spanning {s1,s2,s3,s4}
  cluster fed keys that all hash to one worker, so the cluster's data is
  maximally skewed across shards throughout the bootstrap.

All four bootstrap concurrently and are validated end-to-end by the existing
harness (per-transaction old-view equality during backfill, new-view silence
until cutover, full all-view equality after cutover, and a final retraction
probe of accumulated state).

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
A new recursive view (a nested subcircuit) now bootstraps concurrently
instead of falling back to the classic stop-the-world bootstrap.

A subtle aspect of this is that the internal clock maintained by the
operators like join must be transferred together with the operator's state.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Drive concurrent bootstrapping from the controller and expose it through the
manager and the Python SDK.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
A concurrently bootstrapped view's backfilled contents were lost: at cutover
`cutover_from` swapped the view's integral into the live circuit but never
delivered the view's accumulated output to the live `OutputHandle`, so the
output connector emitted only post-cutover live deltas — never the backfilled
state. The new view's first output must equal the full accumulated view, exactly
as a from-scratch run produces it.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add a `--concurrent-bootstrap` flag to `fda start`, `fda approve`,
`fda restart`, and `fda shell --start`.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add end-to-end tests in python/tests/platform/test_concurrent_bootstrapping.py
covering concurrent bootstrap of a modified pipeline.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
…ics)

Report the progress of a concurrent bootstrap, mirroring how regular
transaction commits are logged and surfaced.

Logging: in the concurrent-bootstrap pump, log the current phase
(concurrent bootstrapping / synchronizing / finalizing) and, while the
bootstrap circuit is committing a transaction, its commit progress -- the
backfill transaction's commit (after inputs are replayed) during
`ConcurrentBootstrapping`, then the synchronization transaction during
`Synchronizing`.

Stats: add `concurrent_bootstrap_progress` to GlobalControllerMetrics.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add `--mode concurrent-bootstrapping` to `test_tpch.py`. It runs exactly like
`checkpoint` mode but each restart that introduces new views bootstraps them
concurrently (`concurrent_bootstrap=true`).

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add a "Concurrent bootstrapping" section to the bootstrapping guide.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
This method was added to the operator trait to identify operators that cannot
be concurrently bootstrapped because they don't support state transfer. After a
series of improvements, all operators now support state transfer, so we
simplified it away.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the concurrent_bootstrap branch from 81d7c56 to eb94db4 Compare June 18, 2026 20:06
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@mihaibudiu

Copy link
Copy Markdown
Contributor

If this works, is the plan to remove the other bootstrapping code and keep only this one?

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No clear blockers — large, well-tested patch. A few design questions and doc nits inline. The two non-trivial ones I would like answered before final approval:

  1. Recorders apply no retention filters by design, so a long backfill on a high-throughput pipeline pays unbounded recorder memory until the sync transaction drains. The experimental warning protects users; the iterative-catchup variant from §3 of the design feels like the right follow-up. Worth tracking explicitly.

  2. The synchronization transaction collapses every recorded boundary delta into a single bootstrap-circuit transaction. For root-scope operators this is incremental-equivalent; I am asking inline on cutover_from how it composes with the nested-circuit clock realignment the recursive-view test exercises.

self.executor.prepare(&self.circuit, None)?;

Ok(())
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronization transaction collapses every recorded boundary delta into a single bootstrap-circuit transaction, after which cutover_from swap_state_withs the backfilled child circuit in and ChildNode::swap_state_with realigns the nested clock with set_time(other.circuit.time()). For a top-level operator this is clearly incremental-equivalent (timestamps are ()). For the recursive-view case the nested circuit ran inside the bootstrap copy n times during backfill plus once for the sync, and the realignment moves the live copy's nested clock forward to that count. My question: is there an invariant that the recorder-replayed boundary delta is always idempotent w.r.t. nested-scope timestamps, or does correctness here rely on the boundary stream only ever feeding delta0-style entry points (so the nested operators never see the sync transaction as anything but a fresh outer tick)? The recursive-view test passes, but I would feel better with a one-paragraph note here pinning down the invariant — nested-scope state transfer is the easiest place for a future change to introduce a silent off-by-one in the recursion's fixpoint.

//! exchange-id allocation — identical across workers. A disabled recorder
//! holds no state and its evaluation is a no-op; in particular it does not
//! allocate a [`Spine`] (whose background merge tasks are costly to keep
//! idle).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module doc is explicit that recorders apply no retention filters and accumulate every delta on every replayable stream that turns out to be a boundary stream. For a long backfill on a high-throughput pipeline that is unbounded memory growth until the sync transaction drains. The Caveats section of the design doc mentions this, and §3 calls out the iterative-catchup variant as the fix. Worth either (a) adding a tracking issue link here so it doesn't get lost, or (b) a one-line note in modifying.md warning users that recorder memory is proportional to backfill duration × boundary-stream throughput. The current docs imply only the cutover window is short; they don't warn about the backfill's resident-memory footprint.

/// during bootstrapping is silently lost at cutover.
///
/// The default is a no-op: most operators hold no persistent state.
fn swap_state(&mut self, other: &mut Self) -> Result<(), Error>

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc says "operators that implement checkpoint/restore must implement this one (with the exception of operators whose checkpoint is an empty marker file), or the state they computed during bootstrapping is silently lost at cutover." That is the right invariant, but the default impl is a no-op, so the compiler will not enforce it: a future operator that gains real checkpointable state but forgets to override swap_state will silently drop it across a concurrent cutover with no warning.

Given blp's rule from PR #6075 on "trait methods — default vs. forced impl" (if every implementor must make a conscious choice, don't provide a default), would it be reasonable to either (a) make this method non-defaulted, or (b) add a supports_state_transfer-style canary that a debug-assert can check against the presence of a non-trivial checkpoint? I am aware that the immediately-preceding commit (eb94db4) removed the previous supports_state_transfer precisely because all operators now support transfer — and that gain is real — but the safety net is gone too. Even just a doc-test asserting the invariant in CI would be enough.

// The bootstrap circuit has no connector reading this view, so its
// paired accumulator would otherwise stay disabled and discard the
// view's contents. Force-enable it once. The accumulator samples
// its enable count on the first non-empty batch of a transaction, and

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions about the enable-count bump:

  1. The comment says "The accumulator samples its enable count on the first non-empty batch of a transaction, and the backfill transaction has not started yet." That's a real temporal ordering invariant. Where is the invariant enforced? start_bootstrap_output_caching is called from prepare_replay after clear_state but before the bootstrap transaction starts, so it holds today — but it's the kind of thing a future refactor could quietly break. A short comment naming the call site that establishes the ordering would help.

  2. Ordering::AcqRel on a single-counter fetch_add is stronger than needed; per Gerd's note on counters (2026-03-27), Ordering::Relaxed would do here — and possibly with CachePadded if this counter is also written on the main copy's hot path. Soft.

// Transfer the cached output from the bootstrap circuit (`other`) to
// this live operator. Only the bootstrap circuit caches, so `self`'s
// cache is empty going in; afterwards this operator holds the
// backfilled output and `other`'s state is irrelevant (it is dropped).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the comment says "other's state is irrelevant (it is dropped)". After std::mem::swap other.cache holds whatever self.cache had before the call — for the live circuit's operator that is None (well-explained), but the swap is still a swap, not a take. If the contract is "this operator is dropped immediately after", asserting that more concretely (e.g. debug_assert!(other.cache.is_none() || self.caching), or simply self.cache = other.cache.take() with a debug_assert!(self.cache.is_none()) guard up front) would make the cutover ordering explicit. Optional.

full contents of new and modified views to input connectors, and returns to the
`Running` state. During this phase the pipeline reports `Synchronizing` runtime status.

Concurrent bootstrapping is *mutually exclusive with `silent_bootstrap`**. Starting a

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Markdown nit: Concurrent bootstrapping is *mutually exclusive with silent_bootstrap**. has unbalanced emphasis — one leading * and two trailing **. Should be **...** (bold) or *...* (italics).

If the user modifies the implementation of a UDF without changing its signature, they need to either clear the
state of the pipeline or rename the UDF to trigger the bootstrapping of any views that depend on the modified UDF.

### Limitation 5: Concurrent bootstrapping with modified tables is not yet supported

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Concurrent bootstrapping is rejected if the modified pipeline contains new or modified tables." This understates the actual restriction in concurrent_restore_refusal: the refusal is triggered by any input operator that is not is_deterministic_source in the bootstrapped region — i.e., a new view that reads an unmaterialized, previously-unconsumed table. A modified materialized table is fine (the test_concurrent_bootstrap_modified_view Python test exercises modifying a view; modified-table semantics are covered by the materialization check). Could you tighten the wording to match the engine refusal — something like "if the bootstrapped region reaches an input stream that has no checkpointed integral to replay (e.g., a new view over an unmaterialized table that no existing view consumed)"?

cluster is fully active in the live copy and an active-node test would miss it.
Freezing pins kept streams to their checkpoint policy while leaving genuinely new
streams (whose policy is `None` in the live copy) free for the bootstrap copy to
choose. An unsatisfiable boundary-spanning cluster still falls back, as do nested

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"An unsatisfiable boundary-spanning cluster still falls back, as do nested circuits and boundary streams with no replay source." The [dbsp] Support concurrent bootstrap of recursive views commit later in this PR removed the nested-circuit fallback for the recursive-view case; this paragraph predates that fix and is now wrong about nested circuits. Recommend dropping "as do nested circuits" here (or qualifying it — e.g., nested circuits whose internal clock cannot be realigned, if any such case remains).

// source's merges actually enforce.
let (name, frontier, (key_filter, value_filter), batches) = {
let state = self.merger.state.lock().unwrap();
(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit on the locking: fork() takes merger.state.lock() once for a consistent snapshot of name/frontier/filters/batches (good), then drops it and proceeds to construct the new spine without the lock. Between the unlock and the for batch in batches insertion, a background merge in self can complete, replacing some of the Arc'd batches we just snapshotted with a merged result. The fork still receives the pre-merge batches via the Arc clones, which is the intended fork-point semantics. But the dirty flag we then set last (fork.dirty = self.dirty) reads self.dirty outside the lock — a concurrent insert into self between the snapshot and this load could leave the fork marked clean while it holds dirty pre-merge state. Probably benign because the only writer to self.dirty is the same worker thread (no concurrency), but it's worth pinning that down in the comment if so.

/// Panics if the recorded contents were lost to a cancelled evaluation
/// (see `RecordingState`): recording silently losing data would
/// corrupt the bootstrap synchronization that consumes it.
pub fn stop_recording(&self) -> Option<Spine<B>> {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop_recording panics if the spine was lost to a cancelled evaluation mid-insert ("recorder lost its contents: an evaluation was cancelled mid-insert"). This is correct (silent loss would corrupt the bootstrap), but the panic propagates as a worker thread panic, which the runtime then surfaces as RuntimeError::WorkerPanic. For a control-plane bug where this can be hit reproducibly (cancelled future during evaluation), an error-return rather than a panic would let pump_concurrent_bootstrap_inner route it through the existing fatal-failure path instead of taking down the whole worker. Soft.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLI Feldera CLI (fda) - related issues connectors Issues related to the adapters/connectors crate DBSP core Related to the core DBSP library Pipeline manager Pipeline manager (API, API server, runner, compiler server) python Pull requests that update python code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants