Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use feldera_types::adapter_stats::{
ConnectorHealth, ExternalControllerStatus, ExternalInputEndpointStatus,
ExternalOutputEndpointStatus,
};
use feldera_types::checkpoint::{CheckpointActivity, CheckpointMetadata};
use feldera_types::checkpoint::{CheckpointActivity, CheckpointMetadata, HostInfo};
use feldera_types::coordination::{
self, AdHocCatalog, AdHocTableType, CheckpointCoordination, Completion, StepAction, StepInputs,
StepRequest, StepStatus, TransactionCoordination,
Expand Down Expand Up @@ -143,7 +143,7 @@ mod error;
mod journal;
mod pipeline_diff;
mod stats;
mod sync;
pub(crate) mod sync;
mod validate;

use crate::adhoc::table::AdHocTable;
Expand Down Expand Up @@ -287,7 +287,7 @@ impl ControllerBuilder {
pub(crate) fn pull_once(&self, _sync: &SyncConfig) -> Result<(), ControllerError> {
#[cfg(feature = "feldera-enterprise")]
if let Some(storage) = &self.storage {
return sync::pull_once(storage, _sync);
return sync::pull_once(storage, _sync, None);
};

Ok(())
Expand All @@ -300,7 +300,7 @@ impl ControllerBuilder {
{
#[cfg(feature = "feldera-enterprise")]
if let Some(storage) = &self.storage {
sync::continuous_pull(storage, _is_activated)
sync::continuous_pull(storage, _is_activated, None)
} else {
Err(ControllerError::InvalidStandby(
"standby mode requires storage configuration",
Expand Down Expand Up @@ -351,6 +351,17 @@ impl ControllerBuilder {
pub(crate) fn storage(&self) -> Option<Arc<dyn StorageBackend>> {
self.storage.as_ref().map(|storage| storage.backend.clone())
}

/// Returns the sync configuration, if one is present in the storage backend.
pub(crate) fn sync_config(&self) -> Option<SyncConfig> {
self.storage.as_ref().and_then(|s| {
if let StorageBackendConfig::File(ref file_cfg) = s.options.backend {
file_cfg.sync.clone()
} else {
None
}
})
}
}

/// Controller that coordinates the creation, reconfiguration, teardown of
Expand Down Expand Up @@ -2201,11 +2212,12 @@ struct CheckpointSyncThread {
uuid: uuid::Uuid,
storage: Arc<dyn StorageBackend>,
config: SyncConfig,
host_info: Option<HostInfo>,
}

impl CheckpointSyncThread {
fn run(self) -> Result<(), Arc<ControllerError>> {
match SYNCHRONIZER.push(self.uuid, self.storage, self.config) {
match SYNCHRONIZER.push(self.uuid, self.storage, self.config, self.host_info) {
Err(err) => {
CHECKPOINT_SYNC_PUSH_FAILURES.fetch_add(1, Ordering::Relaxed);
Err(Arc::new(ControllerError::checkpoint_push_error(
Expand Down Expand Up @@ -2291,6 +2303,7 @@ impl RunningCheckpointSync {
),
))?,
config: sync.to_owned(),
host_info: circuit.controller.layout.host_info(),
};
let unparker = circuit.parker.unparker().clone();
let join_handle = std::thread::Builder::new()
Expand Down
71 changes: 54 additions & 17 deletions crates/adapters/src/controller/sync.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
#![allow(unused_imports)]
#[cfg(feature = "feldera-enterprise")]
use anyhow::Context;
use std::sync::{
Arc, LazyLock, Mutex, Weak,
atomic::{AtomicU64, Ordering},
};
#[cfg(feature = "feldera-enterprise")]
use std::sync::atomic::Ordering;
use std::sync::{Arc, LazyLock, atomic::AtomicU64};

#[cfg(feature = "feldera-enterprise")]
use dbsp::circuit::{CircuitStorageConfig, checkpointer::Checkpointer};
use feldera_adapterlib::errors::journal::ControllerError;
#[cfg(feature = "feldera-enterprise")]
use feldera_storage::StoragePath;
use feldera_storage::{
StorageBackend, StoragePath, checkpoint_synchronizer::CheckpointSynchronizer,
StorageBackend, checkpoint_synchronizer::CheckpointSynchronizer,
histogram::ExponentialHistogram,
};
#[cfg(feature = "feldera-enterprise")]
use feldera_types::{
checkpoint::CheckpointMetadata,
config::{FileBackendConfig, StorageBackendConfig, SyncConfig},
config::{FileBackendConfig, StorageBackendConfig},
constants::ACTIVATION_MARKER_FILE,
};

use crate::server::ServerState;
use feldera_types::{checkpoint::HostInfo, config::SyncConfig};

// Pull metrics
/// Bytes transferred when pulling a checkpoint.
Expand Down Expand Up @@ -78,9 +80,11 @@ fn pull_and_gc(
storage: Arc<dyn StorageBackend>,
sync: &SyncConfig,
prev: &mut uuid::Uuid,
host_info: Option<HostInfo>,
standby: bool,
) -> Result<CheckpointMetadata, ControllerError> {
match SYNCHRONIZER
.pull(storage.clone(), sync.to_owned())
.pull(storage.clone(), sync.to_owned(), host_info, standby)
.map_err(|e| ControllerError::checkpoint_fetch_error(format!("{e:?}")))
{
Err(err) => {
Expand Down Expand Up @@ -127,16 +131,53 @@ pub fn is_pull_necessary(storage: &CircuitStorageConfig) -> Option<&SyncConfig>
}

#[cfg(feature = "feldera-enterprise")]
pub fn pull_once(storage: &CircuitStorageConfig, sync: &SyncConfig) -> Result<(), ControllerError> {
pull_and_gc(storage.backend.clone(), sync, &mut uuid::Uuid::nil())?;
pub fn pull_once(
storage: &CircuitStorageConfig,
sync: &SyncConfig,
host_info: Option<HostInfo>,
) -> Result<(), ControllerError> {
pull_and_gc(
storage.backend.clone(),
sync,
&mut uuid::Uuid::nil(),
host_info,
false,
)?;

Ok(())
}

/// Pulls the latest checkpoint from object storage directly using a storage
/// backend, without a full `CircuitStorageConfig`.
///
/// Used by the multihost coordinator's pull endpoint, which has access to the
/// storage backend but not the full circuit storage config.
#[cfg(feature = "feldera-enterprise")]
pub fn pull_once_with_backend(
storage: Arc<dyn StorageBackend>,
sync: &SyncConfig,
host_info: Option<HostInfo>,
standby: bool,
) -> Result<(), ControllerError> {
pull_and_gc(storage, sync, &mut uuid::Uuid::nil(), host_info, standby)?;
Ok(())
}

#[cfg(not(feature = "feldera-enterprise"))]
pub fn pull_once_with_backend(
_storage: Arc<dyn StorageBackend>,
_sync: &SyncConfig,
_host_info: Option<HostInfo>,
_standby: bool,
) -> Result<(), ControllerError> {
Err(ControllerError::EnterpriseFeature("checkpoint pull"))
}

#[cfg(feature = "feldera-enterprise")]
pub fn continuous_pull<F>(
storage: &CircuitStorageConfig,
is_activated: F,
host_info: Option<HostInfo>,
) -> Result<(), ControllerError>
where
F: Fn() -> bool,
Expand Down Expand Up @@ -194,7 +235,7 @@ where
// Also, if we receive an activation signal, we run one more iteration to
// ensure that we have the latest checkpoint before activating.
loop {
match pull_and_gc(storage.backend.clone(), sync, &mut prev) {
match pull_and_gc(storage.backend.clone(), sync, &mut prev, host_info, true) {
Err(err) => {
// On our final attempt to pull the checkpoint after activation, if we fail, we should error out and not activate with a potentially stale or missing checkpoint.
if pull_once_again_after_activation {
Expand All @@ -206,10 +247,6 @@ where
}
};

if !sync.standby {
return Ok(());
}

if is_activated() {
if pull_once_again_after_activation {
// We've already done one iteration after activation, now we're done
Expand Down
Loading
Loading