Skip to content

Commit c89ff35

Browse files
committed
sync: support automatic periodic checkpoint syncs
Adds a parameter `push_interval` to `SyncConfig` to specify the interval at which to push the latest checkpoint to object storage. Default is None, disabling the periodic push. Adds a field `periodic` to response of `/checkpoint/sync_status`, that maintains the latest periodic checkpoint that was synced. I chose adding a field instead of setting it to `success` as I think the user should be able to know if the sync was automatic or manual; it also affects allows us to maintain the current the behavior of the python client downstream. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent ac92090 commit c89ff35

7 files changed

Lines changed: 186 additions & 33 deletions

File tree

crates/adapters/src/controller.rs

Lines changed: 91 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ pub use feldera_types::config::{
148148
ConnectorConfig, FormatConfig, InputEndpointConfig, OutputEndpointConfig, PipelineConfig,
149149
RuntimeConfig, TransportConfig,
150150
};
151-
use feldera_types::config::{FileBackendConfig, FtConfig, FtModel, OutputBufferConfig, SyncConfig};
151+
use feldera_types::config::{
152+
FileBackendConfig, FtConfig, FtModel, OutputBufferConfig, StorageBackendConfig, SyncConfig,
153+
};
152154
use feldera_types::constants::{STATE_FILE, STEPS_FILE};
153155
use feldera_types::format::json::{JsonFlavor, JsonParserConfig, JsonUpdateFormat};
154156
use feldera_types::program_schema::{SqlIdentifier, canonical_identifier};
@@ -522,6 +524,10 @@ impl Controller {
522524
self.inner.last_checkpoint()
523525
}
524526

527+
pub(crate) fn last_checkpoint_sync(&self) -> LastCheckpoint {
528+
self.inner.last_checkpoint_sync()
529+
}
530+
525531
pub fn lir(&self) -> &LirCircuit {
526532
&self.inner.lir
527533
}
@@ -1693,14 +1699,21 @@ impl RunningCheckpointSync {
16931699
Ok(Self::Waiting(uuid, join_handle, receiver))
16941700
}
16951701

1696-
fn poll(&mut self) -> Option<Result<(), Arc<ControllerError>>> {
1702+
fn poll(&mut self, circuit: &mut CircuitThread) -> Option<Result<(), Arc<ControllerError>>> {
16971703
let uuid = self.uuid();
16981704

16991705
match replace(self, Self::Done(uuid)) {
17001706
Self::Error(_, error) => Some(Err(error)),
17011707
Self::Waiting(uuid, join_handle, mut receiver) => match receiver.try_recv() {
17021708
Ok(result) => {
17031709
join_handle.join().unwrap();
1710+
1711+
let mut last_sync = circuit.controller.last_checkpoint_sync.lock().unwrap();
1712+
*last_sync = LastCheckpoint {
1713+
timestamp: Instant::now(),
1714+
id: Some(uuid),
1715+
};
1716+
17041717
Some(result)
17051718
}
17061719
Err(TryRecvError::Empty) => {
@@ -1716,9 +1729,21 @@ impl RunningCheckpointSync {
17161729
}
17171730
}
17181731

1719-
struct SyncCheckpointRequest {
1720-
uuid: uuid::Uuid,
1721-
cb: SyncCheckpointCallbackFn,
1732+
enum SyncCheckpointRequest {
1733+
Scheduled(uuid::Uuid),
1734+
Requested {
1735+
uuid: uuid::Uuid,
1736+
cb: SyncCheckpointCallbackFn,
1737+
},
1738+
}
1739+
1740+
impl SyncCheckpointRequest {
1741+
fn uuid(&self) -> uuid::Uuid {
1742+
match self {
1743+
Self::Scheduled(uuid) => *uuid,
1744+
Self::Requested { uuid, .. } => *uuid,
1745+
}
1746+
}
17221747
}
17231748

17241749
#[derive(Clone)]
@@ -2136,17 +2161,22 @@ impl CircuitThread {
21362161
output_backpressure_warning = None;
21372162

21382163
match trigger.trigger(
2139-
self.last_checkpoint().timestamp,
2164+
self.last_checkpoint(),
2165+
self.last_checkpoint_sync(),
21402166
self.replaying(),
21412167
self.circuit.bootstrap_in_progress(),
21422168
self.checkpoint_requested(),
2169+
self.sync_checkpoint_requested(),
21432170
) {
21442171
Action::Step => {
21452172
if !self.step()? {
21462173
break;
21472174
}
21482175
}
21492176
Action::Checkpoint => self.checkpoint_requests.push(CheckpointRequest::Scheduled),
2177+
Action::SyncCheckpoint(chk) => self
2178+
.sync_checkpoint_requests
2179+
.push(SyncCheckpointRequest::Scheduled(chk)),
21502180
Action::Park(Some(deadline)) => self.parker.park_deadline(deadline),
21512181
Action::Park(None) => self.parker.park(),
21522182
}
@@ -2341,6 +2371,10 @@ impl CircuitThread {
23412371
self.controller.last_checkpoint()
23422372
}
23432373

2374+
fn last_checkpoint_sync(&self) -> LastCheckpoint {
2375+
self.controller.last_checkpoint_sync()
2376+
}
2377+
23442378
fn update_last_checkpoint(&self, result: &Result<Checkpoint, ControllerError>) {
23452379
*self.controller.last_checkpoint.lock().unwrap() = LastCheckpoint {
23462380
timestamp: Instant::now(),
@@ -2437,10 +2471,11 @@ impl CircuitThread {
24372471
.push(CheckpointRequest::SuspendCommand(reply_callback));
24382472
}
24392473
Command::SyncCheckpoint((uuid, reply_callback)) => {
2440-
self.sync_checkpoint_requests.push(SyncCheckpointRequest {
2441-
uuid,
2442-
cb: reply_callback,
2443-
});
2474+
self.sync_checkpoint_requests
2475+
.push(SyncCheckpointRequest::Requested {
2476+
uuid,
2477+
cb: reply_callback,
2478+
});
24442479
}
24452480
}
24462481
}
@@ -2803,8 +2838,10 @@ impl CircuitThread {
28032838
uuid: uuid::Uuid,
28042839
result: Result<(), Arc<ControllerError>>,
28052840
) {
2806-
for request in requests.extract_if(.., |x| x.uuid == uuid) {
2807-
(request.cb)(result.clone());
2841+
for request in requests.extract_if(.., |x| x.uuid() == uuid) {
2842+
if let SyncCheckpointRequest::Requested { cb, .. } = request {
2843+
(cb)(result.clone());
2844+
}
28082845
}
28092846
}
28102847

@@ -2814,7 +2851,7 @@ impl CircuitThread {
28142851

28152852
match running_sync {
28162853
RunningCheckpointSync::Waiting(uuid, _, _) => {
2817-
let Some(result) = running_sync.poll() else {
2854+
let Some(result) = running_sync.poll(self) else {
28182855
self.running_checkpoint_sync = Some(running_sync);
28192856
return;
28202857
};
@@ -2836,7 +2873,7 @@ impl CircuitThread {
28362873
};
28372874

28382875
if self.running_checkpoint_sync.is_none() {
2839-
self.running_checkpoint_sync = Some(RunningCheckpointSync::new(self, req.uuid));
2876+
self.running_checkpoint_sync = Some(RunningCheckpointSync::new(self, req.uuid()));
28402877
}
28412878
}
28422879
}
@@ -3168,6 +3205,9 @@ struct StepTrigger {
31683205
/// Time between automatic checkpoints.
31693206
checkpoint_interval: Option<Duration>,
31703207

3208+
/// Time between automatic checkpoint syncs.
3209+
sync_interval: Option<Duration>,
3210+
31713211
/// The circuit is bootstrapping. Used to detect the transition from bootstrapping
31723212
/// to normal mode.
31733213
bootstrapping: bool,
@@ -3184,6 +3224,9 @@ enum Action {
31843224

31853225
/// Step the circuit.
31863226
Step,
3227+
3228+
/// Synchronize a checkpoint to object storage.
3229+
SyncCheckpoint(uuid::Uuid),
31873230
}
31883231

31893232
impl StepTrigger {
@@ -3193,19 +3236,30 @@ impl StepTrigger {
31933236
let max_buffering_delay = Duration::from_micros(config.max_buffering_delay_usecs);
31943237
let min_batch_size_records = config.min_batch_size_records;
31953238
let checkpoint_interval = config.fault_tolerance.checkpoint_interval();
3239+
let sync_interval = config.storage.as_ref().and_then(|s| match &s.backend {
3240+
StorageBackendConfig::File(file) => file
3241+
.sync
3242+
.as_ref()
3243+
.and_then(|s| s.push_interval)
3244+
.map(Duration::from_secs),
3245+
_ => None,
3246+
});
3247+
31963248
Self {
31973249
controller,
31983250
buffer_timeout: None,
31993251
max_buffering_delay,
32003252
min_batch_size_records,
32013253
checkpoint_interval,
32023254
bootstrapping,
3255+
sync_interval,
32033256
}
32043257
}
32053258

32063259
/// Determines when to trigger the next step, given:
32073260
///
3208-
/// - The time of the last checkpoint.
3261+
/// - The metadata about the last checkpoint.
3262+
/// - The metadata about the last sync checkpoint.
32093263
/// - Whether we're currently `replaying`.
32103264
/// - Whether the pipeline is currently `bootstrapping`.
32113265
/// - Whether the pipeline is currently `running`.
@@ -3214,10 +3268,12 @@ impl StepTrigger {
32143268
/// Returns the action for the controller to take.
32153269
fn trigger(
32163270
&mut self,
3217-
last_checkpoint: Instant,
3271+
last_checkpoint: LastCheckpoint,
3272+
last_sync: LastCheckpoint,
32183273
replaying: bool,
32193274
bootstrapping: bool,
32203275
checkpoint_requested: bool,
3276+
sync_checkpoint_requested: bool,
32213277
) -> Action {
32223278
// If any input endpoints are blocking suspend, then those are the only
32233279
// ones that we count; otherwise, count all of them.
@@ -3238,7 +3294,12 @@ impl StepTrigger {
32383294
// Time of the next checkpoint.
32393295
let checkpoint = self
32403296
.checkpoint_interval
3241-
.map(|interval| last_checkpoint + interval);
3297+
.map(|interval| last_checkpoint.timestamp + interval);
3298+
3299+
// Time of the next checkpoint sync.
3300+
let sync_checkpoint = self
3301+
.sync_interval
3302+
.map(|interval| last_sync.timestamp + interval);
32423303

32433304
// Used to force a step regardless of input
32443305
let committing = self.controller.transaction_commit_requested();
@@ -3258,6 +3319,13 @@ impl StepTrigger {
32583319
step(self)
32593320
} else if checkpoint.is_some_and(|t| now >= t) && !checkpoint_requested {
32603321
Action::Checkpoint
3322+
} else if sync_checkpoint.is_some_and(|t| now >= t)
3323+
&& !sync_checkpoint_requested
3324+
&& let Some(chk) = last_checkpoint.id
3325+
&& !chk.is_nil()
3326+
&& Some(chk) != last_sync.id
3327+
{
3328+
Action::SyncCheckpoint(chk)
32613329
} else if self.controller.status.unset_step_requested()
32623330
|| buffered_records > self.min_batch_size_records
32633331
|| self.buffer_timeout.is_some_and(|t| now >= t)
@@ -4284,6 +4352,7 @@ impl TransactionInfo {
42844352
pub struct ControllerInner {
42854353
pub status: Arc<ControllerStatus>,
42864354
last_checkpoint: Mutex<LastCheckpoint>,
4355+
last_checkpoint_sync: Mutex<LastCheckpoint>,
42874356
secrets_dir: PathBuf,
42884357
num_api_connections: AtomicU64,
42894358
command_sender: Sender<Command>,
@@ -4354,6 +4423,7 @@ impl ControllerInner {
43544423
command_sender,
43554424
catalog: Arc::new(catalog),
43564425
last_checkpoint: Default::default(),
4426+
last_checkpoint_sync: Default::default(),
43574427
lir,
43584428
trace_snapshot: Arc::new(TokioMutex::new(BTreeMap::new())),
43594429
next_input_id: Atomic::new(0),
@@ -4451,6 +4521,10 @@ impl ControllerInner {
44514521
self.last_checkpoint.lock().unwrap().clone()
44524522
}
44534523

4524+
fn last_checkpoint_sync(&self) -> LastCheckpoint {
4525+
self.last_checkpoint_sync.lock().unwrap().clone()
4526+
}
4527+
44544528
fn get_transaction_number(&self) -> u64 {
44554529
self.transaction_number.load(Ordering::Acquire)
44564530
}

crates/adapters/src/server.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ impl CheckpointSyncState {
160160
}
161161
}
162162
}
163+
164+
fn completed_periodic(&mut self, uuid: uuid::Uuid) {
165+
self.status.periodic = Some(uuid);
166+
}
163167
}
164168

165169
#[derive(Clone, Debug, Default)]
@@ -1681,8 +1685,17 @@ async fn checkpoint_status(state: WebData<ServerState>) -> impl Responder {
16811685
}
16821686

16831687
#[get("/checkpoint/sync_status")]
1684-
async fn sync_checkpoint_status(state: WebData<ServerState>) -> impl Responder {
1685-
HttpResponse::Ok().json(state.sync_checkpoint_state.lock().unwrap().status.clone())
1688+
async fn sync_checkpoint_status(
1689+
state: WebData<ServerState>,
1690+
) -> Result<HttpResponse, PipelineError> {
1691+
let mut sync_state = state.sync_checkpoint_state.lock().unwrap();
1692+
1693+
let controller = state.controller()?;
1694+
if let Some(chk) = controller.last_checkpoint_sync().id {
1695+
sync_state.completed_periodic(chk);
1696+
}
1697+
1698+
Ok(HttpResponse::Ok().json(sync_state.status.clone()))
16861699
}
16871700

16881701
/// Suspends the pipeline and terminate the circuit.

crates/feldera-types/src/checkpoint.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ pub struct CheckpointSyncStatus {
5858

5959
/// Most recently failed checkpoint sync, and the associated error.
6060
pub failure: Option<CheckpointSyncFailure>,
61+
62+
/// Most recently successful automated periodic checkpoint sync.
63+
pub periodic: Option<Uuid>,
6164
}
6265

6366
/// Information about a failed checkpoint sync.

crates/feldera-types/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,12 @@ pub struct SyncConfig {
527527
#[serde(default = "default_pull_interval")]
528528
pub pull_interval: u64,
529529

530+
/// The interval (in seconds) between each push of checkpoints to object store.
531+
///
532+
/// Default: disabled (no periodic push).
533+
#[serde(default)]
534+
pub push_interval: Option<u64>,
535+
530536
/// Extra flags to pass to `rclone`.
531537
///
532538
/// WARNING: Supplying incorrect or conflicting flags can break `rclone`.

openapi.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11518,6 +11518,13 @@
1151811518
"default": 10,
1151911519
"minimum": 0
1152011520
},
11521+
"push_interval": {
11522+
"type": "integer",
11523+
"format": "int64",
11524+
"description": "The interval (in seconds) between each push of checkpoints to object store.\n\nDefault: disabled (no periodic push).",
11525+
"nullable": true,
11526+
"minimum": 0
11527+
},
1152111528
"region": {
1152211529
"type": "string",
1152311530
"description": "The region that this bucket is in.\n\nLeave empty for Minio or the default region (`us-east-1` for AWS).",

python/feldera/pipeline.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,10 +825,11 @@ def sync_checkpoint_status(self, uuid: str) -> CheckpointStatus:
825825

826826
resp = self.client.sync_checkpoint_status(self.name)
827827
success = resp.get("success")
828+
periodic = resp.get("periodic")
828829

829830
fail = resp.get("failure") or {}
830831

831-
if uuid == success:
832+
if uuid == success or uuid == periodic:
832833
return CheckpointStatus.Success
833834

834835
fail = resp.get("failure") or {}
@@ -843,6 +844,26 @@ def sync_checkpoint_status(self, uuid: str) -> CheckpointStatus:
843844

844845
return CheckpointStatus.Unknown
845846

847+
def last_successful_checkpoint_sync(self) -> UUID:
848+
"""
849+
Returns the UUID of the last successfully synced checkpoint.
850+
851+
:return: The UUID of the last successfully synced checkpoint.
852+
"""
853+
854+
resp = self.client.sync_checkpoint_status(self.name)
855+
success = resp.get("success")
856+
periodic = resp.get("periodic")
857+
858+
if success is None and periodic is None:
859+
raise RuntimeError("no checkpoints have been synced yet")
860+
elif success is None:
861+
return UUID(periodic)
862+
elif periodic is None:
863+
return UUID(success)
864+
else:
865+
return max(UUID(success), UUID(periodic))
866+
846867
def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
847868
"""
848869
Executes an ad-hoc SQL query on this pipeline and returns a generator

0 commit comments

Comments
 (0)