From b00816cdf381fd104a2f86cc6c6fef69a41ce1fb Mon Sep 17 00:00:00 2001 From: Swanand Mulay <73115739+swanandx@users.noreply.github.com> Date: Tue, 16 Jun 2026 11:41:35 +0530 Subject: [PATCH 1/2] [adapters] checkpoint-suspend on SIGTERM Infra-initiated termination (node drain, eviction) doesn't involve the runner, so the pipeline does not stop gracefully. To solve this, we add an env var FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM: when set, SIGTERM runs the same checkpoint-and-suspend as /suspend before stopping. disabled by default. Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com> --- crates/adapters/Cargo.toml | 2 +- crates/adapters/src/server.rs | 119 ++++++++++++++++-- .../docs/operations/clean-shutdown.md | 52 ++++++++ 3 files changed, 162 insertions(+), 11 deletions(-) create mode 100644 docs.feldera.com/docs/operations/clean-shutdown.md diff --git a/crates/adapters/Cargo.toml b/crates/adapters/Cargo.toml index 2e998652ea2..be0631fb5a1 100644 --- a/crates/adapters/Cargo.toml +++ b/crates/adapters/Cargo.toml @@ -109,7 +109,7 @@ futures-util = { workspace = true } proptest = { workspace = true, optional = true } proptest-derive = { workspace = true, optional = true } clap = { workspace = true } -tokio = { workspace = true, features = ["sync", "macros", "fs", "rt"] } +tokio = { workspace = true, features = ["sync", "macros", "fs", "rt", "signal"] } utoipa = { workspace = true } chrono = { workspace = true, features = ["serde"] } colored = { workspace = true } diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index d57f8d5bc02..61514979532 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -111,7 +111,7 @@ use std::{ }; use tokio::spawn; use tokio::sync::Notify; -use tokio::task::spawn_blocking; +use tokio::task::{JoinHandle, spawn_blocking}; use tokio::time::{sleep, timeout}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::WatchStream; @@ -816,6 +816,21 @@ pub fn run_server( config.global.workers as usize }; + // Deployment-wide opt-in: checkpoint-and-suspend on `SIGTERM` instead of a + // plain graceful HTTP shutdown. Off unless the env var is truthy. + let clean_shutdown_on_sigterm = std::env::var("FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM") + .map(|value| { + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) + .unwrap_or(false); + + // Clone for the signal task before `state` moves into the server factory. + #[cfg(unix)] + let signal_state = clean_shutdown_on_sigterm.then(|| state.clone()); + let server = HttpServer::new({ move || { let state = state.clone(); @@ -875,6 +890,15 @@ pub fn run_server( // The default in actix is 30s. We may consider making this configurable. .shutdown_timeout(10); + // Stop actix from shutting down on signals so our handler (below) can + // checkpoint first. Unix-only, since that handler uses `tokio::signal::unix`. + #[cfg(unix)] + let server = if clean_shutdown_on_sigterm { + server.disable_signals() + } else { + server + }; + let server = if args.enable_https || args.https_tls_cert_path.is_some() || args.https_tls_key_path.is_some() @@ -936,6 +960,64 @@ pub fn run_server( tokio::fs::rename(&tmp_server_port_file, SERVER_PORT_FILE) .await .map_err(|e| ControllerError::io_error("renaming server port file", e))?; + + // Own signal handling: SIGTERM (pod eviction, node drain) checkpoints + // before shutdown; SIGINT (Ctrl-C) stops immediately. The handle is + // captured before `server` moves into `.await` below. + #[cfg(unix)] + if let Some(signal_state) = signal_state { + let handle = server.handle(); + tokio::spawn(async move { + use futures::future::{select, Either}; + use std::pin::pin; + use tokio::signal::unix::{signal, SignalKind}; + + let mut sigterm = match signal(SignalKind::terminate()) { + Ok(stream) => stream, + Err(error) => { + error!("failed to install SIGTERM handler ({error}); shutdown will not checkpoint"); + return; + } + }; + let mut sigint = match signal(SignalKind::interrupt()) { + Ok(stream) => stream, + Err(error) => { + error!("failed to install SIGINT handler ({error}); shutdown will not checkpoint"); + return; + } + }; + + // `tokio::select!` won't compile in this module: it collides + // with the `start` unit struct from the `#[get("/start")]` macro. + let sigterm_fired = { + let sigterm_recv = pin!(sigterm.recv()); + let sigint_recv = pin!(sigint.recv()); + matches!(select(sigterm_recv, sigint_recv).await, Either::Left(_)) + }; + if sigterm_fired { + info!("received SIGTERM; suspending before shutdown"); + match trigger_suspend(signal_state.clone()) { + // Await the checkpoint so `handle.stop` below doesn't abort it. + // The pod's `terminationGracePeriodSeconds` is the backstop: if + // the suspend wedges, k8s SIGKILLs us when the grace period ends. + Ok(Some(handle)) => { + let _ = handle.await; + } + // Already suspended: nothing to wait for. + Ok(None) => {} + Err(error) => error!( + "cannot suspend on signal ({error}); shutting down without checkpoint" + ), + } + } else { + info!("received SIGINT; shutting down without checkpoint"); + } + + // Drain in-flight requests, then let `server.await` return. + handle.stop(true).await; + }); + } + server .await .map_err(|e| ControllerError::io_error("in the HTTP server", e)) @@ -2007,17 +2089,24 @@ async fn sync_checkpoint_status( Ok(HttpResponse::Ok().json(sync_state.status.clone())) } -/// Suspends the pipeline and terminate the circuit. +/// Starts the suspend flow: move `desired_status` to `Suspended` and spawn the +/// task that checkpoints, then stops the controller. /// -/// This implementation is designed to be idempotent, so that any number of -/// suspend requests act like just one. -#[post("/suspend")] -async fn suspend(state: WebData) -> Result { +/// Shared by the `/suspend` handler and the `SIGTERM` handler. Returns the +/// `JoinHandle` of the spawned task, or `None` when the pipeline was already +/// suspended (nothing to wait for) — so the `SIGTERM` handler can await the +/// checkpoint before stopping the server. Idempotent (a call while already +/// `Suspended` is a no-op, so racing requests act as one) and errors only on an +/// invalid transition (e.g. from `Standby`). Synchronous and holds no lock +/// across an `.await`. +fn trigger_suspend( + state: WebData, +) -> Result>, PipelineError> { let mut desired_status = state.desired_status.lock().unwrap(); match *desired_status { RuntimeDesiredStatus::Unavailable => unreachable!(), RuntimeDesiredStatus::Standby => { - return Err(PipelineError::InvalidTransition("suspend", *desired_status)); + Err(PipelineError::InvalidTransition("suspend", *desired_status)) } RuntimeDesiredStatus::Coordination | RuntimeDesiredStatus::Running @@ -2055,10 +2144,20 @@ async fn suspend(state: WebData) -> Result (), - }; + RuntimeDesiredStatus::Suspended => Ok(None), + } +} + +/// Suspends the pipeline and terminate the circuit. +/// +/// This implementation is designed to be idempotent, so that any number of +/// suspend requests act like just one. +#[post("/suspend")] +async fn suspend(state: WebData) -> Result { + // Fire and forget: the suspend runs in the background; the client polls status. + let _ = trigger_suspend(state)?; Ok(HttpResponse::Accepted().json("Pipeline is suspending")) } diff --git a/docs.feldera.com/docs/operations/clean-shutdown.md b/docs.feldera.com/docs/operations/clean-shutdown.md new file mode 100644 index 00000000000..216d09ec11d --- /dev/null +++ b/docs.feldera.com/docs/operations/clean-shutdown.md @@ -0,0 +1,52 @@ +--- +title: Clean shutdown on SIGTERM +sidebar_position: 30 +--- + +# Clean shutdown on SIGTERM + +When Feldera stops a pipeline through its own control plane (for example, a +graceful `/stop`), the pipeline checkpoints its state and suspends before the +pods are removed. But a pipeline pod can also be terminated by the +infrastructure with no Feldera involvement — Kubernetes node drain, eviction, +Karpenter consolidation, or `kubectl delete`. These deliver `SIGTERM` to the +pipeline process. + +By default the pipeline reacts to `SIGTERM` with a graceful *HTTP* shutdown: it +drains in-flight requests and exits, but it does **not** checkpoint. Any state +accumulated since the last checkpoint is lost and must be recomputed on the next +start. + +Setting `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM` makes the pipeline run the same +checkpoint-and-suspend flow as a graceful stop when it receives `SIGTERM`, so an +infrastructure-initiated termination resumes from a fresh checkpoint. + +## Environment variables + +| Variable | Default | Meaning | +| --- | --- | --- | +| `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM` | off | When truthy (`1`, `true`, `yes`, `on`; case-insensitive), checkpoint and suspend on `SIGTERM` before shutting down. | + +## Kubernetes: `terminationGracePeriodSeconds` + +The feature only has an effect when the pod is given enough time to checkpoint +before the kernel kills it. Kubernetes sends `SIGTERM`, waits +`terminationGracePeriodSeconds`, then sends `SIGKILL`. If the grace period is +`0` (or too short to finish a checkpoint), the pod is `SIGKILL`ed and the +checkpoint is truncated — the same outcome as not enabling the feature. + +When enabling `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM`: + +- Set the pipeline pod's `terminationGracePeriodSeconds` comfortably above the + expected checkpoint time (which scales with state size and object-store + latency). This is the only deadline: the pipeline waits for the checkpoint to + finish, and if it wedges, Kubernetes `SIGKILL`s the pod when the grace period + ends. + +## Other signals + +With the feature enabled the pipeline installs its own handlers for `SIGTERM` +(checkpoint, then shut down) and `SIGINT` / Ctrl-C (immediate graceful shutdown, +no checkpoint). Default handling of other signals such as `SIGHUP` and `SIGQUIT` +is no longer installed; in a Kubernetes deployment only `SIGTERM` (then +`SIGKILL`) is sent, so this does not affect normal operation. From 12ace6974c134b071dede0cdded0c7924bb93f76 Mon Sep 17 00:00:00 2001 From: feldera-bot Date: Tue, 16 Jun 2026 07:44:21 +0000 Subject: [PATCH 2/2] [ci] apply automatic fixes Signed-off-by: feldera-bot --- crates/adapters/src/server.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 61514979532..254e703cfa3 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -2099,9 +2099,7 @@ async fn sync_checkpoint_status( /// `Suspended` is a no-op, so racing requests act as one) and errors only on an /// invalid transition (e.g. from `Standby`). Synchronous and holds no lock /// across an `.await`. -fn trigger_suspend( - state: WebData, -) -> Result>, PipelineError> { +fn trigger_suspend(state: WebData) -> Result>, PipelineError> { let mut desired_status = state.desired_status.lock().unwrap(); match *desired_status { RuntimeDesiredStatus::Unavailable => unreachable!(),