Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ futures-util = { workspace = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
clap = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros", "fs", "rt"] }
tokio = { workspace = true, features = ["sync", "macros", "fs", "rt", "signal"] }
utoipa = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
colored = { workspace = true }
Expand Down
117 changes: 107 additions & 10 deletions crates/adapters/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use std::{
};
use tokio::spawn;
use tokio::sync::Notify;
use tokio::task::spawn_blocking;
use tokio::task::{JoinHandle, spawn_blocking};
use tokio::time::{sleep, timeout};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::WatchStream;
Expand Down Expand Up @@ -816,6 +816,21 @@ pub fn run_server(
config.global.workers as usize
};

// Deployment-wide opt-in: checkpoint-and-suspend on `SIGTERM` instead of a
// plain graceful HTTP shutdown. Off unless the env var is truthy.
let clean_shutdown_on_sigterm = std::env::var("FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM")
.map(|value| {
matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false);

// Clone for the signal task before `state` moves into the server factory.
#[cfg(unix)]
let signal_state = clean_shutdown_on_sigterm.then(|| state.clone());

let server = HttpServer::new({
move || {
let state = state.clone();
Expand Down Expand Up @@ -875,6 +890,15 @@ pub fn run_server(
// The default in actix is 30s. We may consider making this configurable.
.shutdown_timeout(10);

// Stop actix from shutting down on signals so our handler (below) can
// checkpoint first. Unix-only, since that handler uses `tokio::signal::unix`.
#[cfg(unix)]
let server = if clean_shutdown_on_sigterm {
server.disable_signals()
Comment thread
swanandx marked this conversation as resolved.
} else {
server
};

let server = if args.enable_https
|| args.https_tls_cert_path.is_some()
|| args.https_tls_key_path.is_some()
Expand Down Expand Up @@ -936,6 +960,64 @@ pub fn run_server(
tokio::fs::rename(&tmp_server_port_file, SERVER_PORT_FILE)
.await
.map_err(|e| ControllerError::io_error("renaming server port file", e))?;

// Own signal handling: SIGTERM (pod eviction, node drain) checkpoints
// before shutdown; SIGINT (Ctrl-C) stops immediately. The handle is
// captured before `server` moves into `.await` below.
#[cfg(unix)]
if let Some(signal_state) = signal_state {
let handle = server.handle();
tokio::spawn(async move {
use futures::future::{select, Either};
use std::pin::pin;
use tokio::signal::unix::{signal, SignalKind};

let mut sigterm = match signal(SignalKind::terminate()) {
Ok(stream) => stream,
Err(error) => {
error!("failed to install SIGTERM handler ({error}); shutdown will not checkpoint");
return;
}
};
let mut sigint = match signal(SignalKind::interrupt()) {
Ok(stream) => stream,
Err(error) => {
error!("failed to install SIGINT handler ({error}); shutdown will not checkpoint");
return;
}
};

// `tokio::select!` won't compile in this module: it collides
// with the `start` unit struct from the `#[get("/start")]` macro.
let sigterm_fired = {
let sigterm_recv = pin!(sigterm.recv());
let sigint_recv = pin!(sigint.recv());
matches!(select(sigterm_recv, sigint_recv).await, Either::Left(_))
Comment thread
swanandx marked this conversation as resolved.
};
if sigterm_fired {
info!("received SIGTERM; suspending before shutdown");
match trigger_suspend(signal_state.clone()) {
// Await the checkpoint so `handle.stop` below doesn't abort it.
// The pod's `terminationGracePeriodSeconds` is the backstop: if
// the suspend wedges, k8s SIGKILLs us when the grace period ends.
Ok(Some(handle)) => {
let _ = handle.await;
}
// Already suspended: nothing to wait for.
Ok(None) => {}
Err(error) => error!(
"cannot suspend on signal ({error}); shutting down without checkpoint"
),
}
} else {
info!("received SIGINT; shutting down without checkpoint");
}

// Drain in-flight requests, then let `server.await` return.
handle.stop(true).await;
Comment thread
swanandx marked this conversation as resolved.
});
}

server
.await
.map_err(|e| ControllerError::io_error("in the HTTP server", e))
Expand Down Expand Up @@ -2007,17 +2089,22 @@ async fn sync_checkpoint_status(
Ok(HttpResponse::Ok().json(sync_state.status.clone()))
}

/// Suspends the pipeline and terminate the circuit.
/// Starts the suspend flow: move `desired_status` to `Suspended` and spawn the
/// task that checkpoints, then stops the controller.
///
/// This implementation is designed to be idempotent, so that any number of
/// suspend requests act like just one.
#[post("/suspend")]
async fn suspend(state: WebData<ServerState>) -> Result<impl Responder, PipelineError> {
/// Shared by the `/suspend` handler and the `SIGTERM` handler. Returns the
/// `JoinHandle` of the spawned task, or `None` when the pipeline was already
/// suspended (nothing to wait for) — so the `SIGTERM` handler can await the
/// checkpoint before stopping the server. Idempotent (a call while already
/// `Suspended` is a no-op, so racing requests act as one) and errors only on an
/// invalid transition (e.g. from `Standby`). Synchronous and holds no lock
/// across an `.await`.
fn trigger_suspend(state: WebData<ServerState>) -> Result<Option<JoinHandle<()>>, PipelineError> {
let mut desired_status = state.desired_status.lock().unwrap();
match *desired_status {
RuntimeDesiredStatus::Unavailable => unreachable!(),
RuntimeDesiredStatus::Standby => {
return Err(PipelineError::InvalidTransition("suspend", *desired_status));
Err(PipelineError::InvalidTransition("suspend", *desired_status))
}
RuntimeDesiredStatus::Coordination
| RuntimeDesiredStatus::Running
Expand Down Expand Up @@ -2055,10 +2142,20 @@ async fn suspend(state: WebData<ServerState>) -> Result<impl Responder, Pipeline
// This can only be spawned once, because we only transition to
// [RuntimeDesiredStatus::Suspended] once and we never transition
// out of it.
tokio::spawn(async move { suspend(state).await });
Ok(Some(tokio::spawn(async move { suspend(state).await })))
}
RuntimeDesiredStatus::Suspended => (),
};
RuntimeDesiredStatus::Suspended => Ok(None),
}
}

/// Suspends the pipeline and terminate the circuit.
///
/// This implementation is designed to be idempotent, so that any number of
/// suspend requests act like just one.
#[post("/suspend")]
async fn suspend(state: WebData<ServerState>) -> Result<impl Responder, PipelineError> {
// Fire and forget: the suspend runs in the background; the client polls status.
let _ = trigger_suspend(state)?;
Ok(HttpResponse::Accepted().json("Pipeline is suspending"))
}

Expand Down
52 changes: 52 additions & 0 deletions docs.feldera.com/docs/operations/clean-shutdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
title: Clean shutdown on SIGTERM
sidebar_position: 30
---

# Clean shutdown on SIGTERM

When Feldera stops a pipeline through its own control plane (for example, a
graceful `/stop`), the pipeline checkpoints its state and suspends before the
pods are removed. But a pipeline pod can also be terminated by the
infrastructure with no Feldera involvement — Kubernetes node drain, eviction,
Karpenter consolidation, or `kubectl delete`. These deliver `SIGTERM` to the
pipeline process.

By default the pipeline reacts to `SIGTERM` with a graceful *HTTP* shutdown: it
drains in-flight requests and exits, but it does **not** checkpoint. Any state
accumulated since the last checkpoint is lost and must be recomputed on the next
start.

Setting `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM` makes the pipeline run the same
checkpoint-and-suspend flow as a graceful stop when it receives `SIGTERM`, so an
infrastructure-initiated termination resumes from a fresh checkpoint.

## Environment variables

| Variable | Default | Meaning |
| --- | --- | --- |
| `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM` | off | When truthy (`1`, `true`, `yes`, `on`; case-insensitive), checkpoint and suspend on `SIGTERM` before shutting down. |

## Kubernetes: `terminationGracePeriodSeconds`

The feature only has an effect when the pod is given enough time to checkpoint
before the kernel kills it. Kubernetes sends `SIGTERM`, waits
`terminationGracePeriodSeconds`, then sends `SIGKILL`. If the grace period is
`0` (or too short to finish a checkpoint), the pod is `SIGKILL`ed and the
checkpoint is truncated — the same outcome as not enabling the feature.

When enabling `FELDERA_CLEAN_SHUTDOWN_ON_SIGTERM`:

- Set the pipeline pod's `terminationGracePeriodSeconds` comfortably above the
expected checkpoint time (which scales with state size and object-store
latency). This is the only deadline: the pipeline waits for the checkpoint to
finish, and if it wedges, Kubernetes `SIGKILL`s the pod when the grace period
ends.

## Other signals

With the feature enabled the pipeline installs its own handlers for `SIGTERM`
(checkpoint, then shut down) and `SIGINT` / Ctrl-C (immediate graceful shutdown,
no checkpoint). Default handling of other signals such as `SIGHUP` and `SIGQUIT`
is no longer installed; in a Kubernetes deployment only `SIGTERM` (then
`SIGKILL`) is sent, so this does not affect normal operation.
Loading