From 388b21ec8d5e020a34bfe71845fe30738064fbad Mon Sep 17 00:00:00 2001 From: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Date: Tue, 5 May 2026 09:32:34 +0000 Subject: [PATCH 1/2] [adapters] add multihost S3 checkpoint sync coordination endpoints - Add POST /coordination/checkpoint/push for the coordinator to direct each pod to sync its checkpoint to S3. - Add POST /coordination/checkpoint/pull (non-blocking, 202 Accepted) and GET /coordination/checkpoint/pull_status so the coordinator can trigger and poll checkpoint pulls without blocking. - Guard POST /checkpoint/sync against multihost pipelines; all multihost sync requests must go through /coordination/checkpoint/push. - Refactor test_checkpoint_sync into focused per-scenario test methods with shared helper utilities. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --- crates/adapters/src/controller.rs | 23 +- crates/adapters/src/controller/sync.rs | 71 ++- crates/adapters/src/server.rs | 193 ++++++- crates/dbsp/src/circuit/dbsp_handle.rs | 17 + crates/feldera-types/src/checkpoint.rs | 77 +++ crates/feldera-types/src/config.rs | 24 +- .../src/api/endpoints/pipeline_interaction.rs | 9 +- crates/storage/src/checkpoint_synchronizer.rs | 19 +- openapi.json | 9 +- python/tests/platform/test_checkpoint_sync.py | 499 +++++++++++------- .../tests/platform/test_connector_status.py | 6 +- python/tests/platform/test_metrics_logs.py | 16 +- python/tests/platform/test_orchestration.py | 16 +- .../tests/platform/test_pipeline_configs.py | 10 +- .../tests/platform/test_pipeline_lifecycle.py | 6 +- python/tests/platform/test_shared_pipeline.py | 6 +- python/tests/runtime/test_connector_status.py | 12 +- python/tests/shared_test_pipeline.py | 7 +- 18 files changed, 745 insertions(+), 275 deletions(-) diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 49ce170204b..f45a11a79a5 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -83,7 +83,7 @@ use feldera_types::adapter_stats::{ ConnectorHealth, ExternalControllerStatus, ExternalInputEndpointStatus, ExternalOutputEndpointStatus, }; -use feldera_types::checkpoint::{CheckpointActivity, CheckpointMetadata}; +use feldera_types::checkpoint::{CheckpointActivity, CheckpointMetadata, HostInfo}; use feldera_types::coordination::{ self, AdHocCatalog, AdHocTableType, CheckpointCoordination, Completion, StepAction, StepInputs, StepRequest, StepStatus, TransactionCoordination, @@ -143,7 +143,7 @@ mod error; mod journal; mod pipeline_diff; mod stats; -mod sync; +pub(crate) mod sync; mod validate; use crate::adhoc::table::AdHocTable; @@ -287,7 +287,7 @@ impl ControllerBuilder { pub(crate) fn pull_once(&self, _sync: &SyncConfig) -> Result<(), ControllerError> { #[cfg(feature = "feldera-enterprise")] if let Some(storage) = &self.storage { - return sync::pull_once(storage, _sync); + return sync::pull_once(storage, _sync, None); }; Ok(()) @@ -300,7 +300,7 @@ impl ControllerBuilder { { #[cfg(feature = "feldera-enterprise")] if let Some(storage) = &self.storage { - sync::continuous_pull(storage, _is_activated) + sync::continuous_pull(storage, _is_activated, None) } else { Err(ControllerError::InvalidStandby( "standby mode requires storage configuration", @@ -351,6 +351,17 @@ impl ControllerBuilder { pub(crate) fn storage(&self) -> Option> { self.storage.as_ref().map(|storage| storage.backend.clone()) } + + /// Returns the sync configuration, if one is present in the storage backend. + pub(crate) fn sync_config(&self) -> Option { + self.storage.as_ref().and_then(|s| { + if let StorageBackendConfig::File(ref file_cfg) = s.options.backend { + file_cfg.sync.clone() + } else { + None + } + }) + } } /// Controller that coordinates the creation, reconfiguration, teardown of @@ -2201,11 +2212,12 @@ struct CheckpointSyncThread { uuid: uuid::Uuid, storage: Arc, config: SyncConfig, + host_info: Option, } impl CheckpointSyncThread { fn run(self) -> Result<(), Arc> { - match SYNCHRONIZER.push(self.uuid, self.storage, self.config) { + match SYNCHRONIZER.push(self.uuid, self.storage, self.config, self.host_info) { Err(err) => { CHECKPOINT_SYNC_PUSH_FAILURES.fetch_add(1, Ordering::Relaxed); Err(Arc::new(ControllerError::checkpoint_push_error( @@ -2291,6 +2303,7 @@ impl RunningCheckpointSync { ), ))?, config: sync.to_owned(), + host_info: circuit.controller.layout.host_info(), }; let unparker = circuit.parker.unparker().clone(); let join_handle = std::thread::Builder::new() diff --git a/crates/adapters/src/controller/sync.rs b/crates/adapters/src/controller/sync.rs index c07c3b2d289..437904d8a19 100644 --- a/crates/adapters/src/controller/sync.rs +++ b/crates/adapters/src/controller/sync.rs @@ -1,23 +1,25 @@ -#![allow(unused_imports)] +#[cfg(feature = "feldera-enterprise")] use anyhow::Context; -use std::sync::{ - Arc, LazyLock, Mutex, Weak, - atomic::{AtomicU64, Ordering}, -}; +#[cfg(feature = "feldera-enterprise")] +use std::sync::atomic::Ordering; +use std::sync::{Arc, LazyLock, atomic::AtomicU64}; +#[cfg(feature = "feldera-enterprise")] use dbsp::circuit::{CircuitStorageConfig, checkpointer::Checkpointer}; use feldera_adapterlib::errors::journal::ControllerError; +#[cfg(feature = "feldera-enterprise")] +use feldera_storage::StoragePath; use feldera_storage::{ - StorageBackend, StoragePath, checkpoint_synchronizer::CheckpointSynchronizer, + StorageBackend, checkpoint_synchronizer::CheckpointSynchronizer, histogram::ExponentialHistogram, }; +#[cfg(feature = "feldera-enterprise")] use feldera_types::{ checkpoint::CheckpointMetadata, - config::{FileBackendConfig, StorageBackendConfig, SyncConfig}, + config::{FileBackendConfig, StorageBackendConfig}, constants::ACTIVATION_MARKER_FILE, }; - -use crate::server::ServerState; +use feldera_types::{checkpoint::HostInfo, config::SyncConfig}; // Pull metrics /// Bytes transferred when pulling a checkpoint. @@ -78,9 +80,11 @@ fn pull_and_gc( storage: Arc, sync: &SyncConfig, prev: &mut uuid::Uuid, + host_info: Option, + standby: bool, ) -> Result { match SYNCHRONIZER - .pull(storage.clone(), sync.to_owned()) + .pull(storage.clone(), sync.to_owned(), host_info, standby) .map_err(|e| ControllerError::checkpoint_fetch_error(format!("{e:?}"))) { Err(err) => { @@ -127,16 +131,53 @@ pub fn is_pull_necessary(storage: &CircuitStorageConfig) -> Option<&SyncConfig> } #[cfg(feature = "feldera-enterprise")] -pub fn pull_once(storage: &CircuitStorageConfig, sync: &SyncConfig) -> Result<(), ControllerError> { - pull_and_gc(storage.backend.clone(), sync, &mut uuid::Uuid::nil())?; +pub fn pull_once( + storage: &CircuitStorageConfig, + sync: &SyncConfig, + host_info: Option, +) -> Result<(), ControllerError> { + pull_and_gc( + storage.backend.clone(), + sync, + &mut uuid::Uuid::nil(), + host_info, + false, + )?; + + Ok(()) +} +/// Pulls the latest checkpoint from object storage directly using a storage +/// backend, without a full `CircuitStorageConfig`. +/// +/// Used by the multihost coordinator's pull endpoint, which has access to the +/// storage backend but not the full circuit storage config. +#[cfg(feature = "feldera-enterprise")] +pub fn pull_once_with_backend( + storage: Arc, + sync: &SyncConfig, + host_info: Option, + standby: bool, +) -> Result<(), ControllerError> { + pull_and_gc(storage, sync, &mut uuid::Uuid::nil(), host_info, standby)?; Ok(()) } +#[cfg(not(feature = "feldera-enterprise"))] +pub fn pull_once_with_backend( + _storage: Arc, + _sync: &SyncConfig, + _host_info: Option, + _standby: bool, +) -> Result<(), ControllerError> { + Err(ControllerError::EnterpriseFeature("checkpoint pull")) +} + #[cfg(feature = "feldera-enterprise")] pub fn continuous_pull( storage: &CircuitStorageConfig, is_activated: F, + host_info: Option, ) -> Result<(), ControllerError> where F: Fn() -> bool, @@ -194,7 +235,7 @@ where // Also, if we receive an activation signal, we run one more iteration to // ensure that we have the latest checkpoint before activating. loop { - match pull_and_gc(storage.backend.clone(), sync, &mut prev) { + match pull_and_gc(storage.backend.clone(), sync, &mut prev, host_info, true) { Err(err) => { // On our final attempt to pull the checkpoint after activation, if we fail, we should error out and not activate with a potentially stale or missing checkpoint. if pull_once_again_after_activation { @@ -206,10 +247,6 @@ where } }; - if !sync.standby { - return Ok(()); - } - if is_activated() { if pull_once_again_after_activation { // We've already done one iteration after activation, now we're done diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 22027506c61..4b84d13bf20 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -56,12 +56,13 @@ use feldera_types::adapter_stats::{ PipelineStatsErrorsResponse, }; use feldera_types::checkpoint::{ - CheckpointFailure, CheckpointResponse, CheckpointStatus, CheckpointSyncFailure, - CheckpointSyncResponse, CheckpointSyncStatus, + CheckpointFailure, CheckpointPullStatus, CheckpointResponse, CheckpointStatus, + CheckpointSyncFailure, CheckpointSyncResponse, CheckpointSyncStatus, HostInfo, }; use feldera_types::completion_token::{ CompletionStatusArgs, CompletionStatusResponse, CompletionTokenResponse, }; +use feldera_types::config::SyncConfig; use feldera_types::constants::STATUS_FILE; use feldera_types::coordination::{ AdHocScan, CoordinationActivate, CoordinationStatus, Labels, RestartArgs, Step, StepRequest, @@ -292,6 +293,20 @@ pub(crate) struct ServerState { storage: Option>, + /// Sync configuration, extracted from the storage backend at startup. + /// + /// Used by the `/coordination/checkpoint/pull` endpoint to pull checkpoints + /// from object storage on behalf of the multihost coordinator. + sync_config: Option, + + /// Host identity of this pod within a multihost pipeline, derived from + /// `--host-id` and `config.global.hosts` at startup. `None` for solo + /// pipelines. + host_info: Option, + + /// Status of the most recent background checkpoint pull. + pull_state: Mutex, + // rate limiter based on tags // NOTE: we assume that there are a finite small number // of tags, so using String is fine. @@ -316,6 +331,7 @@ struct Lease { } impl ServerState { + #[allow(clippy::too_many_arguments)] fn new( phase: PipelinePhase, md: String, @@ -323,6 +339,8 @@ impl ServerState { bootstrap_policy: BootstrapPolicy, deployment_id: Uuid, storage: Option>, + sync_config: Option, + host_info: Option, ) -> Self { // Max 10 errors per minute let rate_limiter = TokenBucketRateLimiter::new(10, Duration::from_secs(60)); @@ -337,6 +355,9 @@ impl ServerState { bootstrap_policy: Atomic::new(bootstrap_policy), deployment_id, storage, + sync_config, + host_info, + pull_state: Default::default(), rate_limiter, samply_state: Default::default(), coordination_activate: Default::default(), @@ -353,6 +374,8 @@ impl ServerState { BootstrapPolicy::Allow, deployment_id, None, + None, + None, ) } @@ -732,6 +755,10 @@ pub fn run_server( return Err(ControllerError::InvalidInitialStatus(initial_status)); } + let host_info = args.host_id.map(|host_idx| HostInfo { + host_idx, + n_hosts: config.global.hosts, + }); let state = WebData::new(ServerState::new( PipelinePhase::Initializing(InitializationState::Starting), md, @@ -739,6 +766,8 @@ pub fn run_server( bootstrap_policy, args.deployment_id, builder.storage().clone(), + builder.sync_config(), + host_info, )); // Initialize the pipeline in a separate thread. On success, this thread @@ -1245,6 +1274,9 @@ where .service(coordination_checkpoint_status) .service(coordination_checkpoint_prepare) .service(coordination_checkpoint_release) + .service(coordination_checkpoint_pull) + .service(coordination_checkpoint_pull_status) + .service(coordination_checkpoint_push) .service(coordination_transaction_status) .service(coordination_completion_status) .service(coordination_adhoc_catalog) @@ -1917,12 +1949,23 @@ fn get_checkpoints(state: &ServerState) -> Result, async fn checkpoint_sync(state: WebData) -> Result { let controller = state.controller()?; + if controller.layout().is_multihost() { + return Ok(HttpResponse::BadRequest().json(ErrorResponse { + message: "checkpoint sync is not supported directly on multihost pipelines; \ + sync requests must go through the coordinator via \ + `/coordination/checkpoint/push`" + .to_string(), + error_code: "400".into(), + details: serde_json::Value::Null, + })); + } + let Some(last_checkpoint) = get_checkpoints(&state)?.back().map(|c| c.uuid) else { return Ok(HttpResponse::BadRequest().json(ErrorResponse { - message: "no checkpoints found; make a POST request to `/checkpoint` to make a new checkpoint".to_string(), - error_code: "400".into(), - details: serde_json::Value::Null, - })); + message: "no checkpoints found; make a POST request to `/checkpoint` to make a new checkpoint".to_string(), + error_code: "400".into(), + details: serde_json::Value::Null, + })); }; spawn(async move { @@ -1937,6 +1980,47 @@ async fn checkpoint_sync(state: WebData) -> Result, + body: web::Json, +) -> Result { + let uuid = body.into_inner().uuid; + let controller = state.controller()?; + + if get_checkpoints(&state)?.iter().all(|c| c.uuid != uuid) { + return Ok(HttpResponse::BadRequest().json(ErrorResponse { + message: format!("checkpoint '{uuid}' not found in local storage"), + error_code: "400".into(), + details: serde_json::Value::Null, + })); + } + + spawn(async move { + let result = controller.async_sync_checkpoint(uuid).await; + state + .sync_checkpoint_state + .lock() + .unwrap() + .completed(uuid, result); + }); + + Ok(HttpResponse::Accepted().json(CheckpointSyncResponse::new(uuid))) +} + /// Initiates a checkpoint and returns its sequence number. The caller may poll /// `/checkpoint_status` to determine when the checkpoint completes. #[post("/checkpoint")] @@ -2544,6 +2628,99 @@ async fn coordination_checkpoint_release( Ok(HttpResponse::Ok().finish()) } +/// Request body for `POST /coordination/checkpoint/pull`. +#[derive(Deserialize)] +struct CoordinationPullBody { + #[serde(default)] + standby: bool, +} + +/// Pulls the latest checkpoint from object storage into local storage. +/// +/// Returns 202 Accepted and starts a background pull. If a pull is already in +/// progress, returns 200 OK without starting a new one. Poll +/// `GET /coordination/checkpoint/pull_status` for the result. Fails +/// synchronously only if the pipeline is already running or storage/sync config +/// is absent. +#[post("/coordination/checkpoint/pull")] +async fn coordination_checkpoint_pull( + state: WebData, + body: web::Json, +) -> Result { + if matches!(state.phase(), PipelinePhase::InitializationComplete) { + return Err(PipelineError::ControllerError { + error: Arc::new(ControllerError::checkpoint_fetch_error( + "checkpoint pull is not allowed while the pipeline is already running".to_string(), + )), + }); + } + + let storage = state + .storage + .clone() + .ok_or_else(|| PipelineError::ControllerError { + error: Arc::new(ControllerError::checkpoint_fetch_error( + "checkpoint pull requires storage to be configured".to_string(), + )), + })?; + let sync = state + .sync_config + .clone() + .ok_or_else(|| PipelineError::ControllerError { + error: Arc::new(ControllerError::checkpoint_fetch_error( + "checkpoint pull requires sync to be configured".to_string(), + )), + })?; + + let host_info = state.host_info; + let standby = body.into_inner().standby; + + { + let mut pull_state = state.pull_state.lock().unwrap(); + if matches!(*pull_state, CheckpointPullStatus::InProgress) { + return Ok(HttpResponse::Ok().finish()); + } + *pull_state = CheckpointPullStatus::InProgress; + } + info!("coordination checkpoint pull: host_info={host_info:?} standby={standby}"); + + spawn(async move { + let result = spawn_blocking(move || { + crate::controller::sync::pull_once_with_backend(storage, &sync, host_info, standby) + }) + .await + .unwrap(); + + let new_status = match result { + Ok(()) => { + info!("coordination checkpoint pull: done"); + CheckpointPullStatus::Ok + } + Err(e) => { + error!("coordination checkpoint pull failed: {e:?}"); + CheckpointPullStatus::Error { + error: e.to_string(), + } + } + }; + *state.pull_state.lock().unwrap() = new_status; + }); + + Ok(HttpResponse::Accepted().finish()) +} + +/// Returns the status of the most recent `POST /coordination/checkpoint/pull`. +/// +/// Returns one of: +/// - `{"status": "not_requested"}` — no pull has been requested yet. +/// - `{"status": "in_progress"}` — a pull is currently running. +/// - `{"status": "ok"}` — the pull completed successfully. +/// - `{"status": "error", "error": "..."}` — the pull failed. +#[get("/coordination/checkpoint/pull_status")] +async fn coordination_checkpoint_pull_status(state: WebData) -> HttpResponse { + HttpResponse::Ok().json(state.pull_state.lock().unwrap().clone()) +} + #[get("/coordination/transaction/status")] async fn coordination_transaction_status( state: WebData, @@ -2906,6 +3083,8 @@ outputs: BootstrapPolicy::Allow, Uuid::new_v4(), None, + None, + None, )); let state_clone = state.clone(); @@ -3199,6 +3378,8 @@ outputs: BootstrapPolicy::Allow, Uuid::default(), None, + None, + None, )); let state_clone = state.clone(); diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 048ffaea610..5615270ddb9 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -235,6 +235,23 @@ impl Layout { Layout::Multihost { local_host_idx, .. } => *local_host_idx, } } + + /// Returns [`HostInfo`] for this host when running in multihost mode, + /// or `None` for solo pipelines. + /// + /// [`HostInfo`]: feldera_types::checkpoint::HostInfo + pub fn host_info(&self) -> Option { + match self { + Layout::Solo { .. } => None, + Layout::Multihost { + hosts, + local_host_idx, + } => Some(feldera_types::checkpoint::HostInfo { + host_idx: *local_host_idx, + n_hosts: hosts.len(), + }), + } + } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] diff --git a/crates/feldera-types/src/checkpoint.rs b/crates/feldera-types/src/checkpoint.rs index 13bcb857d0b..7eff0b1840d 100644 --- a/crates/feldera-types/src/checkpoint.rs +++ b/crates/feldera-types/src/checkpoint.rs @@ -128,6 +128,33 @@ pub struct CheckpointMetadata { pub processed_records: Option, } +/// Identifies a host within a multihost pipeline. +/// +/// Used to scope checkpoint sync operations (push/pull) to the correct +/// remote subdirectory. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct HostInfo { + /// Zero-based index of this host in the pipeline layout. + pub host_idx: usize, + /// Total number of hosts in the pipeline layout. + pub n_hosts: usize, +} + +impl HostInfo { + /// Returns the remote storage subdirectory prefix for this host, + /// e.g. `"host0"` for index 0. + pub fn prefix(&self) -> String { + if self.host_idx >= self.n_hosts { + log::warn!( + "HostInfo::prefix: host_idx {} >= n_hosts {}", + self.host_idx, + self.n_hosts + ); + } + format!("host{}", self.host_idx) + } +} + /// Format of `pspine-batches-*.dat` in storage. /// /// These files exist to be a simple format for higher-level code and outside @@ -144,3 +171,53 @@ pub struct CheckpointSyncMetrics { pub speed: u64, pub bytes: u64, } + +/// Status of a `POST /coordination/checkpoint/pull` operation. +/// +/// Returned by `GET /coordination/checkpoint/pull_status`. +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(tag = "status", rename_all = "snake_case")] +pub enum CheckpointPullStatus { + /// No pull has been requested yet. + #[default] + NotRequested, + /// A pull is currently in progress. + InProgress, + /// The pull completed successfully. + Ok, + /// The pull failed. + Error { error: String }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn host_info_prefix_formats_index() { + assert_eq!( + HostInfo { + host_idx: 0, + n_hosts: 2 + } + .prefix(), + "host0" + ); + assert_eq!( + HostInfo { + host_idx: 1, + n_hosts: 2 + } + .prefix(), + "host1" + ); + assert_eq!( + HostInfo { + host_idx: 42, + n_hosts: 100 + } + .prefix(), + "host42" + ); + } +} diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 487a5c647a7..d88046627ec 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -528,19 +528,8 @@ pub struct SyncConfig { /// Default: 10 pub upload_concurrency: Option, - /// When `true`, the pipeline starts in **standby** mode; processing doesn't - /// start until activation (`POST /activate`). - /// If this pipeline was previously activated and the storage has not been - /// cleared, the pipeline will auto activate, no newer checkpoints will be - /// fetched. - /// - /// Standby behavior depends on `start_from_checkpoint`: - /// - If `latest`, pipeline continuously fetches the latest available - /// checkpoint until activated. - /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits - /// in standby until activated. - /// - /// Default: `false` + /// **Deprecated.** Use `initial=standby` when starting the pipeline instead. + #[deprecated(note = "Use `initial=standby` when starting the pipeline instead.")] #[schema(default = std::primitive::bool::default)] #[serde(default)] pub standby: bool, @@ -615,10 +604,11 @@ fn default_retention_min_age() -> u32 { impl SyncConfig { pub fn validate(&self) -> Result<(), String> { - if self.standby && self.start_from_checkpoint.is_none() { - return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set. -Standby mode requires `start_from_checkpoint` to be set. -Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned()); + #[allow(deprecated)] + if self.standby { + return Err( + "The `standby` config field has been deprecated. Use `initial=standby` when starting the pipeline instead.".to_owned() + ); } if let Some(ref rb) = self.read_bucket diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index 10b0d62c401..afa27cc1611 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -1400,6 +1400,10 @@ pub(crate) async fn get_checkpoint_sync_status( /// Get the checkpoints for a pipeline /// /// Retrieve the current checkpoints made by a pipeline. +/// +/// **Stability note**: for multihost pipelines, this endpoint returns the +/// combined checkpoint list from all hosts. The shape of this response may +/// change in a future release. #[utoipa::path( context_path = "/v0", security(("JSON web token (JWT) or API key" = [])), @@ -1408,7 +1412,10 @@ pub(crate) async fn get_checkpoint_sync_status( ), responses( (status = OK - , description = "Checkpoints retrieved successfully" + , description = "Checkpoints retrieved successfully. \ + For multihost pipelines the list contains entries \ + from all hosts; the shape of this response may \ + change in a future release." , content_type = "application/json" , body = CheckpointMetadata), (status = NOT_FOUND diff --git a/crates/storage/src/checkpoint_synchronizer.rs b/crates/storage/src/checkpoint_synchronizer.rs index da048aca7e0..ef8769c46c3 100644 --- a/crates/storage/src/checkpoint_synchronizer.rs +++ b/crates/storage/src/checkpoint_synchronizer.rs @@ -1,24 +1,41 @@ use std::sync::{Arc, LazyLock}; use feldera_types::{ - checkpoint::{CheckpointMetadata, CheckpointSyncMetrics}, + checkpoint::{CheckpointMetadata, CheckpointSyncMetrics, HostInfo}, config::SyncConfig, }; use crate::StorageBackend; pub trait CheckpointSynchronizer: Sync { + /// Push a checkpoint to remote object storage. + /// + /// `host_info` identifies the calling host within a multihost pipeline. + /// When `Some`, the checkpoint zip and catalog are written under + /// `host{N}/` in the remote bucket; when `None` (solo pipeline), the + /// existing flat layout is used for backward compatibility. fn push( &self, checkpoint: uuid::Uuid, storage: Arc, remote_config: SyncConfig, + host_info: Option, ) -> anyhow::Result>; + /// Pull a checkpoint from remote object storage. + /// + /// `host_info` scopes the pull to the correct `host{N}/` subdirectory. + /// Pass `None` for solo pipelines to use the existing flat layout. + /// + /// `standby` indicates that the pipeline is in standby mode: the + /// local-storage cache is bypassed (always pull from remote) and a missing + /// remote checkpoint is treated as an error rather than a fresh start. fn pull( &self, storage: Arc, remote_config: SyncConfig, + host_info: Option, + standby: bool, ) -> anyhow::Result<(CheckpointMetadata, Option)>; } diff --git a/openapi.json b/openapi.json index d8e46664999..eb432488bc5 100644 --- a/openapi.json +++ b/openapi.json @@ -2540,7 +2540,7 @@ "Pipeline Lifecycle" ], "summary": "Get the checkpoints for a pipeline", - "description": "Retrieve the current checkpoints made by a pipeline.", + "description": "Retrieve the current checkpoints made by a pipeline.\n\n**Stability note**: for multihost pipelines, this endpoint returns the\ncombined checkpoint list from all hosts. The shape of this response may\nchange in a future release.", "operationId": "get_checkpoints", "parameters": [ { @@ -2555,7 +2555,7 @@ ], "responses": { "200": { - "description": "Checkpoints retrieved successfully", + "description": "Checkpoints retrieved successfully. For multihost pipelines the list contains entries from all hosts; the shape of this response may change in a future release.", "content": { "application/json": { "schema": { @@ -13490,8 +13490,9 @@ }, "standby": { "type": "boolean", - "description": "When `true`, the pipeline starts in **standby** mode; processing doesn't\nstart until activation (`POST /activate`).\nIf this pipeline was previously activated and the storage has not been\ncleared, the pipeline will auto activate, no newer checkpoints will be\nfetched.\n\nStandby behavior depends on `start_from_checkpoint`:\n- If `latest`, pipeline continuously fetches the latest available\ncheckpoint until activated.\n- If checkpoint UUID, pipeline fetches this checkpoint once and waits\nin standby until activated.\n\nDefault: `false`", - "default": false + "description": "**Deprecated.** Use `initial=standby` when starting the pipeline instead.", + "default": false, + "deprecated": true }, "start_from_checkpoint": { "allOf": [ diff --git a/python/tests/platform/test_checkpoint_sync.py b/python/tests/platform/test_checkpoint_sync.py index 84acc5a7560..1d1ebd56e84 100644 --- a/python/tests/platform/test_checkpoint_sync.py +++ b/python/tests/platform/test_checkpoint_sync.py @@ -69,45 +69,45 @@ def storage_cfg( class TestCheckpointSync(SharedTestPipeline): - @enterprise_only - @single_host_only - def test_checkpoint_sync( + def _wait_for_standby_checkpoint_pull(self, pipeline, timeout_s: float = 120): + end = time.monotonic() + timeout_s + for log in pipeline.logs(): + if "checkpoint pulled successfully" in log: + return + if time.monotonic() > end: + raise TimeoutError( + f"{pipeline.name} timed out waiting to pull checkpoint" + ) + + def _configure_and_start( self, - from_uuid: bool = False, - random_uuid: bool = False, - clear_storage: bool = True, - auth_err: bool = False, - strict: bool = False, - expect_empty: bool = False, - standby: bool = False, ft_interval: int = 60, - automated_checkpoint: bool = False, - automated_sync_interval: Optional[int] = None, + push_interval: Optional[int] = None, + retention_min_age: int = 0, ): - """ - CREATE TABLE t0 (c0 INT, c1 VARCHAR); - CREATE MATERIALIZED VIEW v0 AS SELECT * FROM t0; - """ - + """Configure the pipeline with AtLeastOnce FT and start it.""" storage_config = storage_cfg( self.pipeline.name, - push_interval=automated_sync_interval, + push_interval=push_interval, retention_min_count=1, - retention_min_age=5 if from_uuid else 0, + retention_min_age=retention_min_age, ) - ft = FaultToleranceModel.AtLeastOnce - self.pipeline.set_runtime_config( RuntimeConfig( workers=FELDERA_TEST_NUM_WORKERS, hosts=FELDERA_TEST_NUM_HOSTS, - fault_tolerance_model=ft, + fault_tolerance_model=FaultToleranceModel.AtLeastOnce, storage=Storage(config=storage_config), checkpoint_interval_secs=ft_interval, ) ) self.pipeline.start() + def _insert_data_and_wait(self): + """Insert random rows and block until the pipeline processes them all. + + Returns (processed, got_before) where got_before is the view snapshot. + """ random.seed(time.time()) total = random.randint(10, 20) data = [{"c0": i, "c1": str(i)} for i in range(1, total)] @@ -116,17 +116,14 @@ def test_checkpoint_sync( start = time.monotonic() timeout = 5 - while True: processed = self.pipeline.stats().global_metrics.total_processed_records if processed == total: break - if time.monotonic() - start > timeout: raise TimeoutError( f"timed out while waiting for pipeline to process {total} records" ) - time.sleep(0.1) got_before = list(self.pipeline.query("SELECT * FROM v0")) @@ -134,78 +131,117 @@ def test_checkpoint_sync( if len(got_before) != processed: raise RuntimeError( - f"adhoc query returned {len(got_before)} but {processed} records were processed: {got_before}" + f"adhoc query returned {len(got_before)} but {processed} records were " + f"processed: {got_before}" ) - chk_uuid = None + return processed, got_before - if not automated_checkpoint: - self.pipeline.checkpoint(wait=True) - else: - # Wait for at least one automated checkpoint to be created with current data. - chk_uuid_holder = {"value": None} + def _wait_for_automated_checkpoint(self, processed): + """Poll until an automated checkpoint covers all *processed* records. - def checkpoint_created() -> bool: - chks = self.pipeline.checkpoints() - chk = next((x for x in chks if x.processed_records == processed), None) - if chk is None: - return False - chk_uuid_holder["value"] = chk.uuid - return True - - wait_for_condition( - "automated checkpoint is created for current processed records", - checkpoint_created, - timeout_s=30.0, - poll_interval_s=0.5, - ) - chk_uuid = chk_uuid_holder["value"] + In multihost pipelines UUIDs are generated independently per pod and + cannot be compared across pods, so we track steps instead. - print("Checkpoint UUID:", chk_uuid, file=sys.stderr) - time.sleep(1) + Returns (chk_uuid, chk_steps). + """ + chk_uuid = None + chk_steps = None + + def checkpoint_created() -> bool: + nonlocal chk_uuid, chk_steps + chks = self.pipeline.checkpoints() + # Group by steps so that multihost totals are summed correctly. + by_steps: dict = {} + for c in chks: + by_steps.setdefault(c.steps, []).append(c) + for steps_val, step_chks in by_steps.items(): + step_total = sum(c.processed_records for c in step_chks) + print( + f"Total: {step_total}, chks: {[chk.to_dict() for chk in step_chks]}", + file=sys.stderr, + ) + # processed_records is partitioned across pods (not replicated), + # so summing per-pod values gives the global total. + if step_total == processed: + chk_uuid = step_chks[0].uuid + chk_steps = steps_val + return True + return False + + wait_for_condition( + "automated checkpoint is created for current processed records", + checkpoint_created, + timeout_s=30.0, + poll_interval_s=0.5, + ) + return chk_uuid, chk_steps + + def _checkpoint_steps(self, synced_uuid) -> Optional[int]: + """Return the step count for *synced_uuid*, or None if not found.""" + return next( + ( + c.steps + for c in self.pipeline.checkpoints() + if str(c.uuid) == str(synced_uuid) + ), + None, + ) - if automated_sync_interval is not None: + def _wait_for_automated_sync(self, chk_uuid=None, chk_steps=None): + """Poll until the periodic sync has uploaded a checkpoint at least as recent as *chk_uuid*.""" - def checkpoint_sync_completed() -> bool: - try: - synced = self.pipeline.last_successful_checkpoint_sync() - print( - "Automatically synced checkpoint UUID:", synced, file=sys.stderr - ) - if synced is not None and chk_uuid is not None: - if synced >= UUID(chk_uuid): - return True - return False - if synced is not None: - return True - return False - except RuntimeError: + def checkpoint_sync_completed() -> bool: + try: + synced = self.pipeline.last_successful_checkpoint_sync() + print("Automatically synced checkpoint UUID:", synced, file=sys.stderr) + if synced is None: return False + if chk_uuid is None: + return True + s_steps = self._checkpoint_steps(synced) + if chk_steps is not None and s_steps is not None: + return s_steps >= chk_steps + return UUID(str(synced)) >= UUID(str(chk_uuid)) + except RuntimeError: + return False - wait_for_condition( - "automated checkpoint sync completes", - checkpoint_sync_completed, - timeout_s=30.0, - poll_interval_s=0.5, - ) - else: - uuid = self.pipeline.sync_checkpoint(wait=True) - print("Synced Checkpoint UUID:", uuid, file=sys.stderr) - if chk_uuid is not None: - assert UUID(uuid) >= UUID(chk_uuid) + wait_for_condition( + "automated checkpoint sync completes", + checkpoint_sync_completed, + timeout_s=30.0, + poll_interval_s=0.5, + ) + def _sync_and_verify(self, chk_uuid=None, chk_steps=None): + """Trigger a manual sync and assert it covers *chk_uuid*. Returns the synced UUID.""" + uuid = self.pipeline.sync_checkpoint(wait=True) + print("Synced Checkpoint UUID:", uuid, file=sys.stderr) + if chk_uuid is not None: + s_steps = self._checkpoint_steps(uuid) + if chk_steps is not None and s_steps is not None: + assert s_steps >= chk_steps + else: + assert UUID(str(uuid)) >= UUID(str(chk_uuid)) + return uuid + + def _restart_from_checkpoint( + self, + start_from, + ft_interval: int = 60, + auth_err: bool = False, + strict: bool = False, + standby: bool = False, + clear_storage: bool = True, + ): + """Stop the running pipeline and restart it from *start_from*.""" self.pipeline.stop(force=True) - if clear_storage: self.pipeline.clear_storage() - if random_uuid: - uuid = uuid4() - - # Restart pipeline from checkpoint storage_config = storage_cfg( pipeline_name=self.pipeline.name, - start_from_checkpoint=uuid if from_uuid else "latest", + start_from_checkpoint=start_from, auth_err=auth_err, strict=strict, standby=standby, @@ -214,7 +250,7 @@ def checkpoint_sync_completed() -> bool: RuntimeConfig( workers=FELDERA_TEST_NUM_WORKERS, hosts=FELDERA_TEST_NUM_HOSTS, - fault_tolerance_model=ft, + fault_tolerance_model=FaultToleranceModel.AtLeastOnce, storage=Storage(config=storage_config), checkpoint_interval_secs=ft_interval, ) @@ -224,72 +260,225 @@ def checkpoint_sync_completed() -> bool: self.pipeline.start() else: self.pipeline.start_standby() + self._wait_for_standby_checkpoint_pull(self.pipeline, timeout_s=120) + assert self.pipeline.status() == PipelineStatus.STANDBY + self.pipeline.activate() - # wait for the pipeline to initialize - start = time.monotonic() - # wait for a maximum of 120 seconds for the pipeline to provison - end = start + 120 + # ========================================================================= + # Tests + # ========================================================================= - # wait for the pipeline to finish provisoning - for log in self.pipeline.logs(): - if "checkpoint pulled successfully" in log: - break + @enterprise_only + def test_checkpoint_sync(self): + """ + CREATE TABLE t0 (c0 INT, c1 VARCHAR); + CREATE MATERIALIZED VIEW v0 AS SELECT * FROM t0; + """ + self._configure_and_start() + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() - if time.monotonic() > end: - raise TimeoutError( - f"{self.pipeline.name} timed out waiting to pull checkpoint" - ) + self._restart_from_checkpoint("latest") - if standby: - assert self.pipeline.status() == PipelineStatus.STANDBY - self.pipeline.activate() + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() + + @enterprise_only + @single_host_only + def test_from_uuid(self): + # retention_min_age prevents the checkpoint from being garbage-collected + # before the pipeline restarts from it. + self._configure_and_start(retention_min_age=5) + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + uuid = self._sync_and_verify() + + self._restart_from_checkpoint(uuid) + + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() + + @enterprise_only + def test_without_clearing_storage(self): + self._configure_and_start() + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + self._restart_from_checkpoint("latest", clear_storage=False) got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + + @enterprise_only + def test_automated_checkpoint(self): + self._configure_and_start(ft_interval=5) + processed, got_before = self._insert_data_and_wait() + chk_uuid, chk_steps = self._wait_for_automated_checkpoint(processed) + time.sleep(1) + self._sync_and_verify(chk_uuid, chk_steps) + + self._restart_from_checkpoint("latest") + got_after = list(self.pipeline.query("SELECT * FROM v0")) print( f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", file=sys.stderr, ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() + + @enterprise_only + def test_automated_checkpoint_sync(self): + self._configure_and_start(ft_interval=5, push_interval=10) + processed, got_before = self._insert_data_and_wait() + chk_uuid, chk_steps = self._wait_for_automated_checkpoint(processed) + time.sleep(1) + self._wait_for_automated_sync(chk_uuid, chk_steps) - if expect_empty: - got_before = [] + self._restart_from_checkpoint("latest") + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() + + @enterprise_only + def test_automated_checkpoint_sync1(self): + # Manual checkpoint, automated sync. + self._configure_and_start(ft_interval=5, push_interval=10) + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._wait_for_automated_sync() + self._restart_from_checkpoint("latest") + + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) + self.assertCountEqual(got_before, got_after) self.pipeline.stop(force=True) + self.pipeline.clear_storage() - if clear_storage: - self.pipeline.clear_storage() + @enterprise_only + def test_autherr_fail(self): + self._configure_and_start() + self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"): + self._restart_from_checkpoint("latest", auth_err=True, strict=True) @enterprise_only - @single_host_only - def test_from_uuid(self): - self.test_checkpoint_sync(from_uuid=True) + def test_autherr(self): + self._configure_and_start() + self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"): + self._restart_from_checkpoint("latest", auth_err=True, strict=False) @enterprise_only @single_host_only - def test_without_clearing_storage(self): - self.test_checkpoint_sync(clear_storage=False) + def test_nonexistent_checkpoint_fail(self): + self._configure_and_start() + self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + with self.assertRaisesRegex(RuntimeError, "were not found in source"): + self._restart_from_checkpoint(uuid4(), strict=True) @enterprise_only @single_host_only - def test_automated_checkpoint(self): - self.test_checkpoint_sync(ft_interval=5, automated_checkpoint=True) + def test_nonexistent_checkpoint(self): + self._configure_and_start() + self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + self._restart_from_checkpoint(uuid4(), strict=False) + + got_after = list(self.pipeline.query("SELECT * FROM v0")) + self.assertEqual(got_after, []) # pipeline started fresh: no data + self.pipeline.stop(force=True) + self.pipeline.clear_storage() @enterprise_only - @single_host_only - def test_automated_checkpoint_sync(self): - self.test_checkpoint_sync( - ft_interval=5, automated_checkpoint=True, automated_sync_interval=10 + def test_standby_activation(self): + self._configure_and_start() + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + self._sync_and_verify() + + self._restart_from_checkpoint("latest", standby=True) + + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() @enterprise_only @single_host_only - def test_automated_checkpoint_sync1(self): - self.test_checkpoint_sync(ft_interval=5, automated_sync_interval=10) + def test_standby_activation_from_uuid(self): + self._configure_and_start(retention_min_age=5) + _, got_before = self._insert_data_and_wait() + self.pipeline.checkpoint(wait=True) + time.sleep(1) + uuid = self._sync_and_verify() + + self._restart_from_checkpoint(uuid, standby=True) + + got_after = list(self.pipeline.query("SELECT * FROM v0")) + print( + f"{self.pipeline.name}: after: {len(got_after)}, {got_after}", + file=sys.stderr, + ) + self.assertCountEqual(got_before, got_after) + self.pipeline.stop(force=True) + self.pipeline.clear_storage() @enterprise_only - @single_host_only def test_automated_sync_auth_error(self): """ CREATE TABLE t0 (c0 INT, c1 VARCHAR); @@ -346,42 +535,6 @@ def checkpoint_exists() -> bool: self.pipeline.clear_storage() @enterprise_only - @single_host_only - def test_autherr_fail(self): - with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"): - self.test_checkpoint_sync(auth_err=True, strict=True) - - @enterprise_only - @single_host_only - def test_autherr(self): - with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"): - self.test_checkpoint_sync(auth_err=True, strict=False) - - @enterprise_only - @single_host_only - def test_nonexistent_checkpoint_fail(self): - with self.assertRaisesRegex(RuntimeError, "were not found in source"): - self.test_checkpoint_sync(random_uuid=True, from_uuid=True, strict=True) - - @enterprise_only - @single_host_only - def test_nonexistent_checkpoint(self): - self.test_checkpoint_sync( - random_uuid=True, from_uuid=True, strict=False, expect_empty=True - ) - - @enterprise_only - @single_host_only - def test_standby_activation(self): - self.test_checkpoint_sync(standby=True) - - @enterprise_only - @single_host_only - def test_standby_activation_from_uuid(self): - self.test_checkpoint_sync(standby=True, from_uuid=True) - - @enterprise_only - @single_host_only def test_standby_fallback(self, from_uuid: bool = False): # Step 1: Start main pipeline storage_config = storage_cfg(self.pipeline.name, retention_min_age=1) @@ -430,16 +583,7 @@ def test_standby_fallback(self, from_uuid: bool = False): ) standby.start_standby() - # Wait until standby pulls the first checkpoint - start = time.monotonic() - end = start + 120 - for log in standby.logs(): - if "checkpoint pulled successfully" in log: - break - if time.monotonic() > end: - raise TimeoutError( - "Timed out waiting for standby pipeline to pull checkpoint" - ) + self._wait_for_standby_checkpoint_pull(standby, timeout_s=120) # Step 4: Add more data and make 3-10 checkpoints extra_ckpts = random.randint(3, 10) @@ -469,12 +613,6 @@ def test_standby_fallback(self, from_uuid: bool = False): assert standby.status() == PipelineStatus.STANDBY standby.activate(timeout_s=(pull_interval * extra_ckpts) + 60) - for log in standby.logs(): - if "activated" in log: - break - if time.monotonic() > end: - raise TimeoutError("Timed out waiting for standby pipeline to activate") - got_after = list(standby.query("SELECT * FROM v0")) # Cleanup @@ -505,7 +643,6 @@ def test_standby_fallback_from_uuid(self): # ------------------------------------------------------------------------- @enterprise_only - @single_host_only def test_local_checkpoint_priority(self): # After syncing checkpoint A to S3, taking a local-only checkpoint B # (without syncing), and restarting without clearing storage, @@ -577,7 +714,6 @@ def test_local_checkpoint_priority(self): # ------------------------------------------------------------------------- @enterprise_only - @single_host_only def test_read_bucket( self, standby: bool = False, @@ -637,15 +773,7 @@ def test_read_bucket( self.pipeline.start_standby() try: - start = time.monotonic() - end = start + 120 - for log in self.pipeline.logs(): - if "checkpoint pulled successfully" in log: - break - if time.monotonic() > end: - raise TimeoutError( - "Timed out waiting for standby pipeline to pull from read_bucket" - ) + self._wait_for_standby_checkpoint_pull(self.pipeline, timeout_s=120) assert self.pipeline.status() == PipelineStatus.STANDBY self.pipeline.activate() @@ -673,7 +801,6 @@ def test_read_bucket_from_uuid(self): self.test_read_bucket(from_uuid=True) @enterprise_only - @single_host_only def test_read_bucket_standby(self): # Standby pipeline seeds from read_bucket when bucket is empty. self.test_read_bucket(standby=True) @@ -685,7 +812,6 @@ def test_read_bucket_standby_from_uuid(self): self.test_read_bucket(standby=True, from_uuid=True) @enterprise_only - @single_host_only def test_bucket_preferred_over_read_bucket(self): # When both bucket and read_bucket hold checkpoints, the pipeline uses # bucket (its own checkpoint), not read_bucket. @@ -760,7 +886,6 @@ def test_bucket_preferred_over_read_bucket(self): source.clear_storage() @enterprise_only - @single_host_only def test_standby_bucket_takes_over_from_read_bucket(self): # In standby mode, when bucket is initially empty the pipeline falls back # to read_bucket. Once the main pipeline pushes a newer checkpoint to @@ -822,15 +947,7 @@ def test_standby_bucket_takes_over_from_read_bucket(self): standby.start_standby() # Wait for standby to pull at least one checkpoint (from read_bucket). - start = time.monotonic() - end = start + 120 - for log in standby.logs(): - if "checkpoint pulled successfully" in log: - break - if time.monotonic() > end: - raise TimeoutError( - "Timed out waiting for standby to pull from read_bucket" - ) + self._wait_for_standby_checkpoint_pull(standby, timeout_s=120) # Step 4: main pipeline inserts data and pushes multiple checkpoints # to its own bucket. @@ -956,7 +1073,6 @@ def test_local_checkpoint_priority_from_uuid(self): self.pipeline.clear_storage() @enterprise_only - @single_host_only def test_local_priority_over_read_bucket(self): # Local checkpoint wins over read_bucket even when the primary S3 bucket # is empty. Priority order: local > bucket > read_bucket. @@ -1127,7 +1243,6 @@ def test_local_priority_over_read_bucket_from_uuid(self): source.clear_storage() @enterprise_only - @single_host_only def test_read_bucket_strict_fail(self): # When fail_if_no_checkpoint=True and both the primary bucket and # read_bucket are empty, the pipeline must fail to start. diff --git a/python/tests/platform/test_connector_status.py b/python/tests/platform/test_connector_status.py index d44179a92d9..de5886ea2b3 100644 --- a/python/tests/platform/test_connector_status.py +++ b/python/tests/platform/test_connector_status.py @@ -197,8 +197,10 @@ def test_http_connector_status_across_restart_and_sql_changes(pipeline_name): wait_for_condition( "after table modification no http connectors remain", - lambda: len(_http_connector_names(pipeline)[0]) == 0 - and len(_http_connector_names(pipeline)[1]) == 0, + lambda: ( + len(_http_connector_names(pipeline)[0]) == 0 + and len(_http_connector_names(pipeline)[1]) == 0 + ), timeout_s=60.0, poll_interval_s=1.0, ) diff --git a/python/tests/platform/test_metrics_logs.py b/python/tests/platform/test_metrics_logs.py index 6ebd5d3be86..64427854efd 100644 --- a/python/tests/platform/test_metrics_logs.py +++ b/python/tests/platform/test_metrics_logs.py @@ -215,10 +215,10 @@ def test_pipeline_logs(pipeline_name): # Poll for logs availability wait_for_condition( "logs endpoint becomes available", - lambda: get( - api_url(f"/pipelines/{pipeline_name}/logs"), stream=True - ).status_code - == HTTPStatus.OK, + lambda: ( + get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code + == HTTPStatus.OK + ), timeout_s=30.0, poll_interval_s=0.5, ) @@ -262,10 +262,10 @@ def test_pipeline_logs(pipeline_name): # Poll until logs become unavailable (404) wait_for_condition( "logs endpoint becomes unavailable after deletion", - lambda: get( - api_url(f"/pipelines/{pipeline_name}/logs"), stream=True - ).status_code - == HTTPStatus.NOT_FOUND, + lambda: ( + get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code + == HTTPStatus.NOT_FOUND + ), timeout_s=30.0, poll_interval_s=0.5, ) diff --git a/python/tests/platform/test_orchestration.py b/python/tests/platform/test_orchestration.py index 620e545c7fc..d831354516f 100644 --- a/python/tests/platform/test_orchestration.py +++ b/python/tests/platform/test_orchestration.py @@ -81,10 +81,10 @@ def test_pipeline_orchestration_basic(pipeline_name): # valuable test to pass. wait_for_condition( f"pipeline stats for {cur_pipeline_name} are available", - lambda: get( - api_url(f"/pipelines/{cur_pipeline_name}/stats") - ).status_code - == HTTPStatus.OK, + lambda: ( + get(api_url(f"/pipelines/{cur_pipeline_name}/stats")).status_code + == HTTPStatus.OK + ), timeout_s=30.0, poll_interval_s=1.0, ) @@ -129,9 +129,11 @@ def test_pipeline_orchestration_basic(pipeline_name): assert resp.status_code == HTTPStatus.OK, (resp.status_code, resp.text) wait_for_condition( "connector start observed", - lambda: not _basic_orchestration_info( - cur_pipeline_name, table_name, connector_name - )[1], + lambda: ( + not _basic_orchestration_info( + cur_pipeline_name, table_name, connector_name + )[1] + ), timeout_s=10.0, poll_interval_s=0.5, ) diff --git a/python/tests/platform/test_pipeline_configs.py b/python/tests/platform/test_pipeline_configs.py index 867b2995bfc..580f8bbb881 100644 --- a/python/tests/platform/test_pipeline_configs.py +++ b/python/tests/platform/test_pipeline_configs.py @@ -45,10 +45,12 @@ def test_pipeline_runtime_config(pipeline_name): "storage_class": "normal", }, }, - lambda rc: rc.get("workers") == 100 - and rc.get("resources", {}).get("cpu_cores_min") == 5 - and rc.get("resources", {}).get("storage_mb_max") == 2000 - and rc.get("resources", {}).get("storage_class") == "normal", + lambda rc: ( + rc.get("workers") == 100 + and rc.get("resources", {}).get("cpu_cores_min") == 5 + and rc.get("resources", {}).get("storage_mb_max") == 2000 + and rc.get("resources", {}).get("storage_class") == "normal" + ), ), ( {"env": {"TEST_ENV": "value"}}, diff --git a/python/tests/platform/test_pipeline_lifecycle.py b/python/tests/platform/test_pipeline_lifecycle.py index 23da8a75609..2369162c88e 100644 --- a/python/tests/platform/test_pipeline_lifecycle.py +++ b/python/tests/platform/test_pipeline_lifecycle.py @@ -138,8 +138,10 @@ def test_pipeline_start_without_compiling(pipeline_name): pipeline = Pipeline.get(pipeline_name, TEST_CLIENT) wait_for_condition( "program status moves past Pending/CompilingSql", - lambda: pipeline.program_status() - not in (ProgramStatus.Pending, ProgramStatus.CompilingSql), + lambda: ( + pipeline.program_status() + not in (ProgramStatus.Pending, ProgramStatus.CompilingSql) + ), timeout_s=1800.0, poll_interval_s=1.0, ) diff --git a/python/tests/platform/test_shared_pipeline.py b/python/tests/platform/test_shared_pipeline.py index 4fce5a53258..4d62f9673a8 100644 --- a/python/tests/platform/test_shared_pipeline.py +++ b/python/tests/platform/test_shared_pipeline.py @@ -347,8 +347,10 @@ def test_failed_pipeline_stop(self): self.pipeline.input_json("tbl", data, wait=False) wait_for_condition( "pipeline stops with deployment error after worker panic", - lambda: self.pipeline.status() == PipelineStatus.STOPPED - and len(self.pipeline.deployment_error()) > 0, + lambda: ( + self.pipeline.status() == PipelineStatus.STOPPED + and len(self.pipeline.deployment_error()) > 0 + ), timeout_s=20.0, poll_interval_s=1.0, ) diff --git a/python/tests/runtime/test_connector_status.py b/python/tests/runtime/test_connector_status.py index 5c5a1a1b7fc..26043bc51a9 100644 --- a/python/tests/runtime/test_connector_status.py +++ b/python/tests/runtime/test_connector_status.py @@ -237,8 +237,10 @@ def output_connector_status() -> OutputEndpointStatus: wait_for_condition( "output connector encode error count reaches 500", - lambda: output_connector_status().metrics.num_encode_errors == 500 - and len(output_connector_status().encode_errors) == 100, + lambda: ( + output_connector_status().metrics.num_encode_errors == 500 + and len(output_connector_status().encode_errors) == 100 + ), timeout_s=100.0, poll_interval_s=1.0, ) @@ -264,8 +266,10 @@ def output_connector_status() -> OutputEndpointStatus: wait_for_condition( "input connector parse error count reaches 500", - lambda: input_connector_status().metrics.num_parse_errors == 500 - and len(input_connector_status().parse_errors) == 100, + lambda: ( + input_connector_status().metrics.num_parse_errors == 500 + and len(input_connector_status().parse_errors) == 100 + ), timeout_s=100.0, poll_interval_s=1.0, ) diff --git a/python/tests/shared_test_pipeline.py b/python/tests/shared_test_pipeline.py index a1b500a5f64..afcfdaaddec 100644 --- a/python/tests/shared_test_pipeline.py +++ b/python/tests/shared_test_pipeline.py @@ -1,12 +1,13 @@ import unittest + +from feldera import Pipeline, PipelineBuilder from feldera.runtime_config import RuntimeConfig from feldera.testutils import ( - unique_pipeline_name, - FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS, + FELDERA_TEST_NUM_WORKERS, + unique_pipeline_name, ) from tests import TEST_CLIENT -from feldera import PipelineBuilder, Pipeline def sql(text_or_iterable): From 60695cd45744f9e826d0623ae78bf81de2fa68cf Mon Sep 17 00:00:00 2001 From: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Date: Mon, 11 May 2026 09:17:08 +0000 Subject: [PATCH 2/2] docs: update docs for multihost S3 sync Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --- .../docs/pipelines/checkpoint-sync.md | 148 ++++++++++++------ 1 file changed, 104 insertions(+), 44 deletions(-) diff --git a/docs.feldera.com/docs/pipelines/checkpoint-sync.md b/docs.feldera.com/docs/pipelines/checkpoint-sync.md index 67f232c652a..e94bc2be0e7 100644 --- a/docs.feldera.com/docs/pipelines/checkpoint-sync.md +++ b/docs.feldera.com/docs/pipelines/checkpoint-sync.md @@ -49,10 +49,11 @@ seconds. ## Standby mode -Pipelines can start in **standby** mode by setting `standby` to `true`. A -standby pipeline does not process data. Instead, it continuously pulls the -latest checkpoint from object store at the interval specified by -`pull_interval`, staying ready for immediate activation. +Pipelines can start in **standby** mode by passing `initial=standby` to the +[start pipeline](/api/start-pipeline) endpoint. A standby +pipeline does not process data. Instead, it continuously pulls the latest +checkpoint from object store at the interval specified by `pull_interval`, +staying ready for immediate activation. To activate a standby pipeline: @@ -125,9 +126,9 @@ On its first start **B** pulls from `bucket-a/pipeline-a` (because | `secret_key` | `string` | | S3 secret key. Not required if using environment-based auth. | | `start_from_checkpoint` | `string` | | Checkpoint UUID to resume from, or `latest` to restore from the latest checkpoint. | | `fail_if_no_checkpoint` | `boolean` | `false` | When `true`, the pipeline fails to start if no checkpoint is found in any source (local storage, `bucket`, or `read_bucket`). When `false`, the pipeline starts from scratch instead. | -| `standby` | `boolean` | `false` | When `true`, the pipeline starts in **standby** mode. See [Standby mode](#standby-mode). `start_from_checkpoint` must be set to use standby mode. | +| `standby` | `boolean` | `false` | **Deprecated.** Use `initial=standby` when starting the pipeline instead. See [Standby mode](#standby-mode). | | `pull_interval` | `integer(u64)` | `10` | Interval (in seconds) between fetch attempts for the latest checkpoint while in standby. | -| `push_interval` | `integer(u64)` | | Interval (in seconds) between automatic syncs of a local checkpoint to object store, measured from the completion of the previous sync. Disabled by default. See [Automatic checkpoint synchronization](#automatic-checkpoint-synchronization). | +| `push_interval` | `integer(u64)` | | Interval (in seconds) between automatic syncs of a local checkpoint to object store, measured from the completion of the previous sync. Disabled by default. See [Automatic checkpoint synchronization](#automatic-checkpoint-synchronization). | | `transfers` | `integer (u8)` | `20` | Number of concurrent file transfers. | | `checkers` | `integer (u8)` | `20` | Number of parallel checkers for verification. | | `ignore_checksum` | `boolean` | `false` | Skip checksum verification after transfer and only check the file size. May improve throughput. | @@ -170,28 +171,31 @@ Example policy: ```json { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "AWS": "arn:aws:iam::USER_SID:user/USER_NAME" - }, - "Action": [ - "s3:ListBucket", - "s3:DeleteObject", - "s3:GetObject", - "s3:PutObject", - "s3:PutObjectAcl" - ], - "Resource": ["arn:aws:s3:::BUCKET_NAME/*", "arn:aws:s3:::BUCKET_NAME"] - }, - { - "Effect": "Allow", - "Action": "s3:ListAllMyBuckets", - "Resource": "arn:aws:s3:::*" - } - ] + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::USER_SID:user/USER_NAME" + }, + "Action": [ + "s3:ListBucket", + "s3:DeleteObject", + "s3:GetObject", + "s3:PutObject", + "s3:PutObjectAcl" + ], + "Resource": [ + "arn:aws:s3:::BUCKET_NAME/*", + "arn:aws:s3:::BUCKET_NAME" + ] + }, + { + "Effect": "Allow", + "Action": "s3:ListAllMyBuckets", + "Resource": "arn:aws:s3:::*" + } + ] } ``` @@ -259,13 +263,14 @@ curl http://localhost/v0/pipelines/{PIPELINE_NAME}/checkpoint/sync_status ### Response fields -| Field | Type | Description | -| ---------- | --------------- | ------------------------------------------------------------------------------------------------------------ | -| `success` | `uuid \| null` | UUID of the most recently successful manually triggered checkpoint sync (`POST /checkpoint/sync`). | -| `failure` | `object \| null`| Details of the most recently failed manually triggered checkpoint sync. Contains `uuid` and `error` fields. | -| `periodic` | `uuid \| null` | UUID of the most recently successful automatic periodic checkpoint sync (configured via `push_interval`). | +| Field | Type | Description | +| ---------- | ---------------- | ----------------------------------------------------------------------------------------------------------- | +| `success` | `uuid \| null` | UUID of the most recently successful manually triggered checkpoint sync (`POST /checkpoint/sync`). | +| `failure` | `object \| null` | Details of the most recently failed manually triggered checkpoint sync. Contains `uuid` and `error` fields. | +| `periodic` | `uuid \| null` | UUID of the most recently successful automatic periodic checkpoint sync (configured via `push_interval`). | `success` and `periodic` track different sync mechanisms: + - `success` is updated only by manual syncs triggered via `POST /checkpoint/sync`. - `periodic` is updated only by automatic syncs configured via `push_interval`. @@ -280,38 +285,93 @@ curl http://localhost/v0/pipelines/{PIPELINE_NAME}/checkpoint/sync_status **Successful manual sync:** ```json -{ "success": "019779b4-8760-75f2-bdf0-71b825e63610", "failure": null, "periodic": null } +{ + "success": "019779b4-8760-75f2-bdf0-71b825e63610", + "failure": null, + "periodic": null +} ``` **Failed manual sync:** ```json { - "success": null, - "failure": { - "uuid": "019779c1-8317-7a71-bd78-7b971f4a3c43", - "error": "Error pushing checkpoint to object store: ... SignatureDoesNotMatch ..." - }, - "periodic": null + "success": null, + "failure": { + "uuid": "019779c1-8317-7a71-bd78-7b971f4a3c43", + "error": "Error pushing checkpoint to object store: ... SignatureDoesNotMatch ..." + }, + "periodic": null } ``` **Automatic periodic sync only (no manual syncs):** ```json -{ "success": null, "failure": null, "periodic": "019779c1-8317-7a71-bd78-7b971f4a3c43" } +{ + "success": null, + "failure": null, + "periodic": "019779c1-8317-7a71-bd78-7b971f4a3c43" +} ``` **Both manual and automatic syncs:** ```json { - "success": "019779b4-8760-75f2-bdf0-71b825e63610", - "failure": null, - "periodic": "019779c1-8317-7a71-bd78-7b971f4a3c43" + "success": "019779b4-8760-75f2-bdf0-71b825e63610", + "failure": null, + "periodic": "019779c1-8317-7a71-bd78-7b971f4a3c43" } ``` +## Multihost pipelines + +In a multihost pipeline, each host pod maintains its own subdirectory inside +the shared bucket (`host0/`, `host1/`, …). LSM tree reference files are stored +at the bucket root and shared across all hosts, since workers on different hosts +may reference the same files. + +### Remote layout + +``` +storage-bucket/ +├── w0-[UUID].feldera # LSM tree reference file (shared across hosts) +├── w1-[UUID].feldera # LSM tree reference file (shared across hosts) +├── dependencies/ # Per-checkpoint dependency manifests +│ ├── [UUID-host0-ckpt1].json +│ ├── [UUID-host1-ckpt1].json +│ └── ... +├── host0/ +│ ├── checkpoints.feldera # Checkpoint catalog for host 0 +│ ├── [UUID-host0-ckpt1].zip # Host 0 checkpoint data +│ └── ... +└── host1/ + ├── checkpoints.feldera # Checkpoint catalog for host 1 + ├── [UUID-host1-ckpt1].zip # Host 1 checkpoint data + └── ... +``` + +### How sync works + +The coordinator manages checkpoint sync for multihost pipelines. Before +triggering a sync, the coordinator selects the same logical step across all +host pods, ensuring their remote catalogs always represent a consistent +snapshot. Each pod then syncs its own subdirectory independently. + +:::important +The remote bucket must contain checkpoints written by a pipeline with the +**same number of hosts**. Restoring into a pipeline with a different host count +will cause the pipeline to fail. +::: + +### Limitations + +- **`start_from_checkpoint` supports only `"latest"`** for multihost pipelines. + Restoring from a specific checkpoint UUID is not supported. +- **Garbage collection** for multihost pipelines is not yet implemented. + Checkpoints accumulate in the bucket until GC support is added. + ## Buckets with server side encryption If the bucket has server side encryption enabled, set the flag