Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9151adf
[docs] concurrent bootstrapping design.
ryzhyk Jun 17, 2026
e4a640a
[dbsp] Add Trace::fork and Trace::frontier
ryzhyk Jun 11, 2026
bb64523
[dbsp] Factor the chunked replay drain into a shared ReplayState helper
ryzhyk Jun 11, 2026
23a4675
[dbsp] Add recorder operator, attached to every replayable stream
ryzhyk Jun 11, 2026
891876e
[dbsp] Add operator state-transfer and sync-replay primitives
ryzhyk Jun 11, 2026
6736380
[dbsp] Run a second "bootstrap" circuit on the worker threads
ryzhyk Jun 11, 2026
9527b27
[dbsp] Concurrent restore: serve old views while a bootstrap circuit …
ryzhyk Jun 11, 2026
0ade16a
[dbsp] Synchronization transaction and cutover for concurrent bootstrap
ryzhyk Jun 11, 2026
c4e3b0b
[dbsp] Add persistent-id variants of set input, linear aggregate, wei…
ryzhyk Jun 12, 2026
d4b224d
[dbsp] Test concurrent bootstrapping across circuit topologies
ryzhyk Jun 11, 2026
80b9cba
[dbsp] Support balanced joins in concurrent bootstrap
ryzhyk Jun 13, 2026
4dd84ee
[dbsp] Concurrent-bootstrap torture tests for balanced joins
ryzhyk Jun 13, 2026
9e52a05
[dbsp] Support concurrent bootstrap of recursive views
ryzhyk Jun 14, 2026
004bc66
concurrent bootstrap: controller, manager, and SDK integration
ryzhyk Jun 15, 2026
2efcc31
[dbsp] preserve new-view output across concurrent-bootstrap cutover
ryzhyk Jun 15, 2026
91fb213
[fda] support concurrent bootstrap and its runtime statuses
ryzhyk Jun 16, 2026
61e453a
[py] integration tests for concurrent bootstrapping
ryzhyk Jun 17, 2026
c351706
[adapters] observability for concurrent bootstrapping (logging + metr…
ryzhyk Jun 17, 2026
7478601
[qa] add a concurrent-bootstrapping mode to the workload test
ryzhyk Jun 17, 2026
1e91f6a
[docs] document concurrent bootstrapping
ryzhyk Jun 17, 2026
4945a8a
Clippy, formatting.
ryzhyk Jun 18, 2026
eb94db4
[dbsp] Simplification: remove support_state_transfer.
ryzhyk Jun 18, 2026
8f316a8
[ci] apply automatic fixes
feldera-bot Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 363 additions & 11 deletions crates/adapters/src/controller.rs

Large diffs are not rendered by default.

65 changes: 64 additions & 1 deletion crates/adapters/src/controller/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use feldera_types::{
memory_pressure::MemoryPressure,
suspend::{PermanentSuspendError, SuspendError},
time_series::SampleStatistics,
transaction::CommitProgressSummary,
transaction::{CommitProgressSummary, ConcurrentBootstrapProgress},
};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -177,9 +177,22 @@ pub struct GlobalControllerMetrics {
/// new and modified views.
bootstrap_in_progress: AtomicBool,

/// A concurrent bootstrap is in its backfill phase: new/modified views
/// replay in the background while the pre-existing views stay live and
/// inputs keep flowing.
concurrent_backfill_in_progress: AtomicBool,

/// A concurrent bootstrap is in its synchronize (cutover) phase: inputs are
/// paused while recorded deltas drain into the new views before the state
/// transfer.
concurrent_synchronize_in_progress: AtomicBool,

/// Transaction commit progress, if a transaction is committing.
pub commit_progress: Mutex<Option<CommitProgressSummary>>,

/// Progress of a concurrent bootstrap, if one is in progress.
pub concurrent_bootstrap_progress: Mutex<Option<ConcurrentBootstrapProgress>>,

/// Time at which the pipeline process started, in seconds since the epoch.
pub start_time: DateTime<Utc>,

Expand Down Expand Up @@ -300,7 +313,10 @@ impl GlobalControllerMetrics {
Self {
state: Atomic::new(PipelineState::Paused),
bootstrap_in_progress: AtomicBool::new(false),
concurrent_backfill_in_progress: AtomicBool::new(false),
concurrent_synchronize_in_progress: AtomicBool::new(false),
commit_progress: Mutex::new(None),
concurrent_bootstrap_progress: Mutex::new(None),
start_time,
incarnation_uuid,
initial_start_time,
Expand Down Expand Up @@ -409,6 +425,25 @@ impl GlobalControllerMetrics {
.store(bootstrap_in_progress, Ordering::Release);
}

fn concurrent_backfill_in_progress(&self) -> bool {
self.concurrent_backfill_in_progress.load(Ordering::Acquire)
}

fn set_concurrent_backfill_in_progress(&self, value: bool) {
self.concurrent_backfill_in_progress
.store(value, Ordering::Release);
}

fn concurrent_synchronize_in_progress(&self) -> bool {
self.concurrent_synchronize_in_progress
.load(Ordering::Acquire)
}

fn set_concurrent_synchronize_in_progress(&self, value: bool) {
self.concurrent_synchronize_in_progress
.store(value, Ordering::Release);
}

fn set_step_requested(&self) -> bool {
self.step_requested.swap(true, Ordering::AcqRel)
}
Expand All @@ -417,6 +452,10 @@ impl GlobalControllerMetrics {
*self.commit_progress.lock().unwrap() = commit_progress;
}

pub fn set_concurrent_bootstrap_progress(&self, progress: Option<ConcurrentBootstrapProgress>) {
*self.concurrent_bootstrap_progress.lock().unwrap() = progress;
}

pub fn update_output_stall_start(&self, stalled: bool) {
let mut output_stall_start = self.output_stall_start.lock().unwrap();
if stalled != output_stall_start.is_some() {
Expand Down Expand Up @@ -739,6 +778,24 @@ impl ControllerStatus {
.set_bootstrap_in_progress(bootstrap_in_progress);
}

pub fn concurrent_backfill_in_progress(&self) -> bool {
self.global_metrics.concurrent_backfill_in_progress()
}

pub fn set_concurrent_backfill_in_progress(&self, value: bool) {
self.global_metrics
.set_concurrent_backfill_in_progress(value);
}

pub fn concurrent_synchronize_in_progress(&self) -> bool {
self.global_metrics.concurrent_synchronize_in_progress()
}

pub fn set_concurrent_synchronize_in_progress(&self, value: bool) {
self.global_metrics
.set_concurrent_synchronize_in_progress(value);
}

pub fn request_step(&self, circuit_thread_unparker: &Unparker) {
let old = self.global_metrics.set_step_requested();
if !old {
Expand Down Expand Up @@ -1355,6 +1412,12 @@ impl ControllerStatus {
0
},
commit_progress: self.global_metrics.commit_progress.lock().unwrap().clone(),
concurrent_bootstrap_progress: self
.global_metrics
.concurrent_bootstrap_progress
.lock()
.unwrap()
.clone(),
transaction_initiators: ctx.transaction_info.initiators.to_api_type(),
start_time: self.global_metrics.start_time,
incarnation_uuid: self.global_metrics.incarnation_uuid,
Expand Down
Loading
Loading