Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use feldera_types::adapter_stats::{
ConnectorHealth, ExternalControllerStatus, ExternalInputEndpointStatus,
ExternalOutputEndpointStatus,
};
use feldera_types::checkpoint::CheckpointMetadata;
use feldera_types::checkpoint::{CheckpointActivity, CheckpointMetadata};
use feldera_types::coordination::{
self, AdHocCatalog, AdHocTableType, CheckpointCoordination, Completion, StepAction, StepInputs,
StepRequest, StepStatus, TransactionCoordination,
Expand Down Expand Up @@ -822,8 +822,13 @@ impl Controller {
.map(|runtime| runtime.memory_pressure_epoch())
.unwrap_or_default();

let checkpoint_activity = self.checkpoint_activity();
let permanent_checkpoint_errors = self.permanent_suspend_errors();

self.status().to_api_type(
self.can_suspend(),
checkpoint_activity,
permanent_checkpoint_errors,
self.pipeline_complete(),
self.inner.transaction_info.lock().unwrap().clone(),
memory_pressure,
Expand Down Expand Up @@ -901,6 +906,66 @@ impl Controller {
self.inner.checkpoint_receiver.clone()
}

/// Returns the time when the current checkpoint delay started, or `None`
/// if no checkpoint is delayed.
pub fn checkpoint_delay_started(&self) -> Option<DateTime<Utc>> {
*self.inner.checkpoint_delay_started.lock().unwrap()
}

/// Returns the time when the current in-progress checkpoint started
/// writing, or `None` if no checkpoint is in progress.
pub fn checkpoint_started(&self) -> Option<DateTime<Utc>> {
*self.inner.checkpoint_started.lock().unwrap()
}

/// Returns permanent suspend errors if the pipeline fundamentally cannot
/// checkpoint (e.g. storage not configured, unsupported input endpoint),
/// or `None` if checkpointing is possible.
pub fn permanent_suspend_errors(&self) -> Option<Vec<PermanentSuspendError>> {
match self.can_suspend() {
Err(SuspendError::Permanent(reasons)) => Some(reasons),
_ => None,
}
}

/// Computes the current [`CheckpointActivity`] from the coordination watch
/// channel and the timestamp mutexes.
pub fn checkpoint_activity(&self) -> CheckpointActivity {
let coordination = self.checkpoint_watcher().borrow().clone();
match coordination {
Some(
CheckpointCoordination::Delayed(reasons)
| CheckpointCoordination::Barriers(reasons),
) => {
let delayed_since = self
.checkpoint_delay_started()
.expect("delay timestamp should be set when coordination is Delayed/Barriers");
CheckpointActivity::Delayed {
reasons,
delayed_since,
}
}
Some(CheckpointCoordination::Ready) => {
let delayed_since = self
.checkpoint_delay_started()
.expect("delay timestamp should be set when coordination is Ready");
CheckpointActivity::Delayed {
reasons: vec![TemporarySuspendError::Coordination],
delayed_since,
}
}
Some(CheckpointCoordination::InProgress) => {
let started_at = self
.checkpoint_started()
.expect("started timestamp should be set when coordination is InProgress");
CheckpointActivity::InProgress { started_at }
}
None | Some(CheckpointCoordination::Done) | Some(CheckpointCoordination::Error(_)) => {
CheckpointActivity::Idle
}
}
}

/// Returns an object for monitoring progress of transactions.
pub fn transaction_watcher(&self) -> tokio::sync::watch::Receiver<TransactionCoordination> {
self.inner.transaction_receiver.clone()
Expand Down Expand Up @@ -3056,12 +3121,38 @@ impl CircuitThread {

/// Sets the value in `self.checkpoint_sender` to `checkpoint_coordination`,
/// but only if that's a real change. This suppresses lots of duplicate
/// sends.
/// sends. Also maintains the timestamp mutexes for the HTTP-facing
/// `/checkpoint_status` endpoint.
fn set_checkpoint_coordination(
&mut self,
checkpoint_coordination: Option<CheckpointCoordination>,
) {
if *self.checkpoint_sender.borrow() != checkpoint_coordination {
let now = Utc::now();
match &checkpoint_coordination {
Some(
CheckpointCoordination::Delayed(_)
| CheckpointCoordination::Barriers(_)
| CheckpointCoordination::Ready,
) => {
// Only set the delay start time on the first transition
// into a delayed state (not on subsequent reason changes).
// Ready is still "delayed" — waiting for the coordinator.
let mut guard = self.controller.checkpoint_delay_started.lock().unwrap();
if guard.is_none() {
*guard = Some(now);
}
}
Some(CheckpointCoordination::InProgress) => {
*self.controller.checkpoint_delay_started.lock().unwrap() = None;
*self.controller.checkpoint_started.lock().unwrap() = Some(now);
}
_ => {
// None, Done, Error — clear both timestamps.
*self.controller.checkpoint_delay_started.lock().unwrap() = None;
*self.controller.checkpoint_started.lock().unwrap() = None;
}
}
self.checkpoint_sender.send_replace(checkpoint_coordination);
}
}
Expand Down Expand Up @@ -5470,6 +5561,16 @@ pub struct ControllerInner {

/// Is the circuit thread still restoring from a checkpoint (this includes the journal replay phase)?
restoring: AtomicBool,

/// Wall-clock time when the checkpoint entered the Delayed or
/// Barriers state. `None` means "not delayed". Set by the circuit thread
/// in `set_checkpoint_coordination`, read by the HTTP thread.
checkpoint_delay_started: Mutex<Option<DateTime<Utc>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert in Rust concurrency, someone else will have to comment whether this is the right type to use here

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a reasonable choice.


/// Wall-clock time when the checkpoint entered the InProgress
/// state. `None` means "not in progress". Set by the circuit thread in
/// `set_checkpoint_coordination`, read by the HTTP thread.
checkpoint_started: Mutex<Option<DateTime<Utc>>>,
}

impl Drop for ControllerInner {
Expand Down Expand Up @@ -5542,6 +5643,8 @@ impl ControllerInner {
coordination_request: Mutex::new(is_multihost.then_some(StepRequest::new_idle(0))),
coordination_prepare_checkpoint: AtomicBool::new(false),
input_completion_notify: Arc::new(Notify::new()),
checkpoint_delay_started: Mutex::new(None),
checkpoint_started: Mutex::new(None),
}
});

Expand Down
7 changes: 6 additions & 1 deletion crates/adapters/src/controller/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ use feldera_types::{
ExternalInputEndpointStatus, ExternalOutputEndpointMetrics, ExternalOutputEndpointStatus,
ShortEndpointConfig,
},
checkpoint::CheckpointActivity,
config::{FtModel, PipelineConfig},
coordination::Completion,
memory_pressure::MemoryPressure,
suspend::SuspendError,
suspend::{PermanentSuspendError, SuspendError},
time_series::SampleStatistics,
transaction::CommitProgressSummary,
};
Expand Down Expand Up @@ -1196,6 +1197,8 @@ impl ControllerStatus {
pub fn to_api_type(
&self,
suspend_error: Result<(), SuspendError>,
checkpoint_activity: CheckpointActivity,
permanent_checkpoint_errors: Option<Vec<PermanentSuspendError>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type is awfully specific, is this the only type of error possible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PermanentSuspendError ? Yes, it covers a few possibilities. This is a type of errors that don't ever allow checkpointing the pipeline.

pipeline_complete: bool,
transaction_info: TransactionInfo,
memory_pressure: MemoryPressure,
Expand Down Expand Up @@ -1330,6 +1333,8 @@ impl ControllerStatus {
adapter_stats::ExternalControllerStatus {
global_metrics,
suspend_error: suspend_error.err(),
checkpoint_activity: Some(checkpoint_activity),
permanent_checkpoint_errors,
inputs,
outputs,
}
Expand Down
Loading
Loading