diff --git a/crates/stackable-certs/src/ca/mod.rs b/crates/stackable-certs/src/ca/mod.rs index c3bb57099..7f7160f09 100644 --- a/crates/stackable-certs/src/ca/mod.rs +++ b/crates/stackable-certs/src/ca/mod.rs @@ -247,7 +247,7 @@ where // // The root profile doesn't add the AuthorityKeyIdentifier extension. // We manually add it below by using the 160-bit SHA-1 hash of the - // subject pulic key. This conforms to one of the outlined methods for + // subject public key. This conforms to one of the outlined methods for // generating key identifiers outlined in RFC 5280, section 4.2.1.2. // // Prepare extensions so we can avoid clones. diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 1739c9a9c..acb71102d 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add `SignalWatcher` which can be used to watch signals and multiply them to gracefully shutdown + multiple concurrent tasks/futures ([#1147]). +- BREAKING: Add support to gracefully shutdown `EosChecker`. + `EosChecker::run` now requires passing a shutdown signal, which is any `Future` ([#1146]). + +[#1146]: https://github.com/stackabletech/operator-rs/pull/1146 +[#1147]: https://github.com/stackabletech/operator-rs/pull/1147 + ## [0.104.0] - 2026-01-26 ### Added diff --git a/crates/stackable-operator/src/eos/mod.rs b/crates/stackable-operator/src/eos/mod.rs index 243cb365a..aaaf927fe 100644 --- a/crates/stackable-operator/src/eos/mod.rs +++ b/crates/stackable-operator/src/eos/mod.rs @@ -1,6 +1,8 @@ use chrono::{DateTime, Utc}; +use futures::FutureExt; use snafu::{ResultExt, Snafu}; use stackable_shared::time::Duration; +use tokio::select; use tracing::{Level, instrument}; /// Available options to configure a [`EndOfSupportChecker`]. @@ -114,33 +116,51 @@ impl EndOfSupportChecker { /// /// It is recommended to run the end-of-support checker via [`futures::try_join!`] or /// [`tokio::join`] alongside other futures (eg. for controllers). - pub async fn run(self) { + pub async fn run(self, shutdown_signal: F) + where + F: Future, + { // Immediately return if the end-of-support checker is disabled. if self.disabled { return; } - // Construct an interval which can be polled. let mut interval = tokio::time::interval(self.interval.into()); + let shutdown_signal = shutdown_signal.fuse(); + tokio::pin!(shutdown_signal); + loop { - // TODO: Add way to stop from the outside - // The first tick ticks immediately. - interval.tick().await; - let now = Utc::now(); - - tracing::info_span!( - "checking end-of-support state", - eos.interval = self.interval.to_string(), - eos.now = now.to_rfc3339(), - ); - - // Continue the loop and wait for the next tick to run the check again. - if now <= self.eos_datetime { - continue; + select! { + // We used a biased polling strategy to always check if a + // shutdown signal was received before polling the EoS check + // interval. + biased; + + _ = &mut shutdown_signal => { + tracing::trace!("received shutdown signal"); + break; + } + + // The first tick ticks immediately. + _ = interval.tick() => { + let now = Utc::now(); + + tracing::info_span!( + "checking end-of-support state", + eos.interval = self.interval.to_string(), + eos.now = now.to_rfc3339(), + ); + + // Continue the loop and wait for the next tick to run the + // check again. + if now <= self.eos_datetime { + continue; + } + + self.emit_warning(now); + } } - - self.emit_warning(now); } } diff --git a/crates/stackable-operator/src/utils/mod.rs b/crates/stackable-operator/src/utils/mod.rs index a08b24a43..7f51eda2c 100644 --- a/crates/stackable-operator/src/utils/mod.rs +++ b/crates/stackable-operator/src/utils/mod.rs @@ -3,6 +3,8 @@ pub mod cluster_info; pub mod crds; pub mod kubelet; pub mod logging; +pub mod signal; + mod option; mod url; diff --git a/crates/stackable-operator/src/utils/signal.rs b/crates/stackable-operator/src/utils/signal.rs new file mode 100644 index 000000000..c585e1ea7 --- /dev/null +++ b/crates/stackable-operator/src/utils/signal.rs @@ -0,0 +1,73 @@ +use snafu::{ResultExt, Snafu}; +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::watch, +}; + +#[derive(Debug, Snafu)] +#[snafu(display("failed to construct signal listener"))] +pub struct SignalError { + source: std::io::Error, +} + +/// Watches for the incoming signal and multiplies it by sending it to all acquired handles. +pub struct SignalWatcher +where + T: Send + Sync + 'static, +{ + watch_rx: watch::Receiver, +} + +impl SignalWatcher +where + T: Default + Send + Sync + 'static, +{ + /// Watches the provided `signal` and multiplies the signal by sending it to all acquired handles + /// constructed through [`SignalWatcher::handle`]. + pub fn new(signal: F) -> Self + where + F: Future + Send + Sync + 'static, + { + let (watch_tx, watch_rx) = watch::channel(T::default()); + + tokio::spawn(async move { + let value = signal.await; + watch_tx.send(value) + }); + + Self { watch_rx } + } + + /// Acquire a new handle which will complete once a `SIGTERM` signal is received. + /// + /// This handle can be cheaply cloned to be able to gracefully shutdown multiple concurrent + /// tasks. + pub fn handle(&self) -> impl Future + use { + let mut watch_rx = self.watch_rx.clone(); + + async move { + watch_rx.changed().await.ok(); + } + } +} + +impl SignalWatcher<()> { + /// Watches the `SIGTERM` signal and multiplies the signal by sending it to all acquired handlers + /// constructed through [`SignalWatcher::handle`]. + // + // NOTE (@Techassi): Note Accepting a generic Future here is possible, but + // `Signal::recv` borrows instead of consuming which clashes with the 'static lifetime + // requirement of `tokio::spawn`. That's why I opted for watching for a particular signal + // internally instead of requiring users to pass the signal to this function. + pub fn sigterm() -> Result { + let mut sigterm = signal(SignalKind::terminate()).context(SignalSnafu)?; + let (watch_tx, watch_rx) = watch::channel(()); + + tokio::spawn(async move { + sigterm.recv().await; + watch_tx.send(()) + }); + + Ok(Self { watch_rx }) + } +} diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index cd9d460c1..a8fe9fe2e 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer` ([#1144]). + Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any + `Future`. + +[#1144]: https://github.com/stackabletech/operator-rs/pull/1144 + ## [0.8.1] - 2026-01-07 ### Fixed diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 43b16d816..d74a69fbd 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -15,15 +15,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; use axum::{Router, routing::get}; -use futures_util::{FutureExt as _, TryFutureExt, select}; +use futures_util::TryFutureExt; use k8s_openapi::ByteString; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; -use tokio::{ - signal::unix::{SignalKind, signal}, - sync::mpsc, - try_join, -}; +use tokio::{sync::mpsc, try_join}; use tower::ServiceBuilder; use webhooks::{Webhook, WebhookError}; use x509_cert::der::{EncodePem, pem::LineEnding}; @@ -59,6 +55,7 @@ pub enum WebhookServerError { /// /// ``` /// use stackable_webhook::{WebhookServer, WebhookServerOptions, webhooks::Webhook}; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// let mut webhooks: Vec> = vec![]; @@ -69,8 +66,9 @@ pub enum WebhookServerError { /// webhook_service_name: "my-operator".to_owned(), /// }; /// let webhook_server = WebhookServer::new(webhooks, webhook_options).await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); /// -/// webhook_server.run().await.unwrap(); +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct WebhookServer { @@ -154,52 +152,16 @@ impl WebhookServer { }) } - /// Runs the Webhook server and sets up signal handlers for shutting down. + /// Runs the [`WebhookServer`] and handles underlying certificate rotations of the [`TlsServer`]. /// - /// This does not implement graceful shutdown of the underlying server. Additionally, the server - /// is never started in cases where no [`Webhook`] is registered. Callers of this function need - /// to ensure to choose the correct joining mechanism for their use-case to for example avoid - /// unexpected shutdowns of the whole Kubernetes controller. - pub async fn run(self) -> Result<()> { - let future_server = self.run_server(); - let future_signal = async { - let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener"); - let mut sigterm = signal(SignalKind::terminate()).expect("create SIGTERM listener"); - - tracing::debug!("created unix signal handlers"); - - select! { - signal = sigint.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGINT"); - } - }, - signal = sigterm.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGTERM"); - } - }, - }; - }; - - // select requires Future + Unpin - tokio::pin!(future_server); - tokio::pin!(future_signal); - - tokio::select! { - res = &mut future_server => { - // If the server future errors, propagate the error - res?; - } - _ = &mut future_signal => { - tracing::info!("shutdown signal received, stopping webhook server"); - } - } - - Ok(()) - } - - async fn run_server(self) -> Result<()> { + /// It should be noted that the server is never started in cases where no [`Webhook`] is + /// registered. Callers of this function need to ensure to choose the correct joining mechanism + /// for their use-case to for example avoid unexpected shutdowns of the whole Kubernetes + /// controller. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { tracing::debug!("run webhook server"); let Self { @@ -217,13 +179,29 @@ impl WebhookServer { } let tls_server = tls_server - .run() + .run(shutdown_signal) .map_err(|err| WebhookServerError::RunTlsServer { source: err }); let cert_update_loop = async { - while let Some(cert) = cert_rx.recv().await { + // Once the shutdown signal is triggered, the TlsServer above should be dropped as the + // run associated function consumes self. This in turn means that when the receiver is + // polled, it will return `Ok(Ready(None))`, which will cause this while loop to break + // and the future to complete. + while let Some(certificate) = cert_rx.recv().await { + // NOTE (@Techassi): There are currently NO semantic conventions for X509 certificates + // and as such, these are pretty much made up and potentially not ideal. + #[rustfmt::skip] + tracing::info!( + x509.not_before = certificate.tbs_certificate.validity.not_before.to_string(), + x509.not_after = certificate.tbs_certificate.validity.not_after.to_string(), + x509.serial_number = certificate.tbs_certificate.serial_number.to_string(), + x509.subject = certificate.tbs_certificate.subject.to_string(), + x509.issuer = certificate.tbs_certificate.issuer.to_string(), + "rotate certificate for registered webhooks" + ); + // The caBundle needs to be provided as a base64-encoded PEM envelope. - let ca_bundle = cert + let ca_bundle = certificate .to_pem(LineEnding::LF) .context(EncodeCertificateAuthorityAsPemSnafu)?; let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec()); @@ -243,6 +221,8 @@ impl WebhookServer { Ok(()) }; + // This either returns if one of the two futures complete with Err(_) or when both complete + // with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received. try_join!(cert_update_loop, tls_server).map(|_| ()) } } diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index fa493159d..3118392fe 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -7,9 +7,10 @@ use axum::{ extract::{ConnectInfo, Request}, middleware::AddExtension, }; +use futures_util::FutureExt as _; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; -use opentelemetry::trace::{FutureExt, SpanKind}; +use opentelemetry::trace::{FutureExt as _, SpanKind}; use opentelemetry_semantic_conventions as semconv; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_shared::time::Duration; @@ -138,17 +139,27 @@ impl TlsServer { /// router. /// /// It also starts a background task to rotate the certificate as needed. - pub async fn run(self) -> Result<()> { + /// + /// The `shutdown_signal` can be used to notify the [`TlsServer`] to + /// gracefully shutdown. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { + let Self { + cert_resolver, + socket_addr, + config, + router, + } = self; + let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL; let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); - let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); - let tcp_listener = - TcpListener::bind(self.socket_addr) - .await - .context(BindTcpListenerSnafu { - socket_addr: self.socket_addr, - })?; + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); + let tcp_listener = TcpListener::bind(socket_addr) + .await + .context(BindTcpListenerSnafu { socket_addr })?; // To be able to extract the connect info from incoming requests, it is // required to turn the router into a Tower service which is capable of @@ -161,24 +172,35 @@ impl TlsServer { // - https://github.com/tokio-rs/axum/discussions/2397 // - https://github.com/tokio-rs/axum/blob/b02ce307371a973039018a13fa012af14775948c/examples/serve-with-hyper/src/main.rs#L98 - let mut router = self - .router - .into_make_service_with_connect_info::(); + let mut router = router.into_make_service_with_connect_info::(); + + // Fuse the future so that it only yields `Poll::Ready` once. The future + // additionally needs to be pinned to be able to be used in the select! + // macro below. + let shutdown_signal = shutdown_signal.fuse(); + tokio::pin!(shutdown_signal); loop { let tls_acceptor = tls_acceptor.clone(); // Wait for either a new TCP connection or the certificate rotation interval tick tokio::select! { - // We opt for a biased execution of arms to make sure we always check if the - // certificate needs rotation based on the interval. This ensures, we always use - // a valid certificate for the TLS connection. + // We opt for a biased execution of arms to make sure we always check if a + // shutdown signal was received or the certificate needs rotation based on the + // interval. This ensures, we always use a valid certificate for the TLS connection. biased; + // As soon as this future because `Poll::Ready`, break out of the loop which cancels + // the certification rotation interval and stops accepting new TCP connections. + _ = &mut shutdown_signal => { + tracing::trace!("received shutdown signal"); + break; + } + // This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed. // As such, we will not miss rotating the certificate. _ = interval.tick() => { - self.cert_resolver + cert_resolver .rotate_certificate() .await .context(RotateCertificateSnafu)? @@ -210,6 +232,8 @@ impl TlsServer { } }; } + + Ok(()) } async fn handle_request( diff --git a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs index c0f11b7d0..64cd58f2d 100644 --- a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs @@ -22,8 +22,7 @@ use snafu::{ResultExt, Snafu}; use tokio::sync::oneshot; use tracing::instrument; -use super::{Webhook, WebhookError}; -use crate::WebhookServerOptions; +use crate::{Webhook, WebhookError, WebhookServerOptions}; #[derive(Debug, Snafu)] pub enum ConversionWebhookError { @@ -51,6 +50,7 @@ pub enum ConversionWebhookError { /// WebhookServer, /// webhooks::{ConversionWebhook, ConversionWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -75,7 +75,9 @@ pub enum ConversionWebhookError { /// let webhook_server = WebhookServer::new(vec![Box::new(conversion_webhook)], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct ConversionWebhook { @@ -135,7 +137,7 @@ impl ConversionWebhook { } #[instrument( - skip(self, crd, crd_api, new_ca_bundle), + skip(self, crd, crd_api, ca_bundle), fields( name = crd.name_any(), kind = &crd.spec.names.kind @@ -145,7 +147,7 @@ impl ConversionWebhook { &self, mut crd: CustomResourceDefinition, crd_api: &Api, - new_ca_bundle: &ByteString, + ca_bundle: ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let crd_kind = &crd.spec.names.kind; @@ -175,7 +177,7 @@ impl ConversionWebhook { port: Some(options.socket_addr.port().into()), }), // Here, ByteString takes care of encoding the provided content as base64. - ca_bundle: Some(new_ca_bundle.to_owned()), + ca_bundle: Some(ca_bundle), url: None, }), }), @@ -244,15 +246,15 @@ where self.options.disable_crd_maintenance } - #[instrument(skip(self, new_ca_bundle))] + #[instrument(skip(self, ca_bundle))] async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let crd_api: Api = Api::all(self.client.clone()); for (crd, _) in &self.crds_and_handlers { - self.reconcile_crd(crd.clone(), &crd_api, new_ca_bundle, options) + self.reconcile_crd(crd.clone(), &crd_api, ca_bundle.to_owned(), options) .await?; } diff --git a/crates/stackable-webhook/src/webhooks/mod.rs b/crates/stackable-webhook/src/webhooks/mod.rs index 23cbca7ef..ac165efac 100644 --- a/crates/stackable-webhook/src/webhooks/mod.rs +++ b/crates/stackable-webhook/src/webhooks/mod.rs @@ -48,7 +48,7 @@ pub trait Webhook { /// Webhooks are informed about new certificates by this function and can react accordingly. async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError>; } diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs index 1beb12cde..9ff9192fd 100644 --- a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -12,8 +12,7 @@ use serde::{Serialize, de::DeserializeOwned}; use snafu::{ResultExt, Snafu}; use tracing::instrument; -use super::{Webhook, WebhookError}; -use crate::{WebhookServerOptions, webhooks::create_webhook_client_config}; +use crate::{Webhook, WebhookError, WebhookServerOptions, webhooks::create_webhook_client_config}; #[derive(Debug, Snafu)] pub enum MutatingWebhookError { @@ -51,6 +50,7 @@ pub enum MutatingWebhookError { /// WebhookServer, /// webhooks::{MutatingWebhook, MutatingWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -76,7 +76,9 @@ pub enum MutatingWebhookError { /// let webhook_server = WebhookServer::new(vec![mutating_webhook], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// /// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration { @@ -206,14 +208,15 @@ where self.options.disable_mwc_maintenance } - #[instrument(skip(self, new_ca_bundle))] + #[instrument(skip(self, ca_bundle))] async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let mut mutating_webhook_configuration = self.mutating_webhook_configuration.clone(); let mwc_name = mutating_webhook_configuration.name_any(); + tracing::info!( k8s.mutatingwebhookconfiguration.name = mwc_name, "reconciling mutating webhook configurations" @@ -222,7 +225,7 @@ where for webhook in mutating_webhook_configuration.webhooks.iter_mut().flatten() { // We know how we can be called (and with what certificate), so we can always set that webhook.client_config = - create_webhook_client_config(options, new_ca_bundle.to_owned(), self.http_path()); + create_webhook_client_config(options, ca_bundle.to_owned(), self.http_path()); } let mwc_api: Api = Api::all(self.client.clone());