@@ -148,7 +148,9 @@ pub use feldera_types::config::{
148148 ConnectorConfig , FormatConfig , InputEndpointConfig , OutputEndpointConfig , PipelineConfig ,
149149 RuntimeConfig , TransportConfig ,
150150} ;
151- use feldera_types:: config:: { FileBackendConfig , FtConfig , FtModel , OutputBufferConfig , SyncConfig } ;
151+ use feldera_types:: config:: {
152+ FileBackendConfig , FtConfig , FtModel , OutputBufferConfig , StorageBackendConfig , SyncConfig ,
153+ } ;
152154use feldera_types:: constants:: { STATE_FILE , STEPS_FILE } ;
153155use feldera_types:: format:: json:: { JsonFlavor , JsonParserConfig , JsonUpdateFormat } ;
154156use feldera_types:: program_schema:: { SqlIdentifier , canonical_identifier} ;
@@ -522,6 +524,10 @@ impl Controller {
522524 self . inner . last_checkpoint ( )
523525 }
524526
527+ pub ( crate ) fn last_checkpoint_sync ( & self ) -> LastCheckpoint {
528+ self . inner . last_checkpoint_sync ( )
529+ }
530+
525531 pub fn lir ( & self ) -> & LirCircuit {
526532 & self . inner . lir
527533 }
@@ -1693,14 +1699,21 @@ impl RunningCheckpointSync {
16931699 Ok ( Self :: Waiting ( uuid, join_handle, receiver) )
16941700 }
16951701
1696- fn poll ( & mut self ) -> Option < Result < ( ) , Arc < ControllerError > > > {
1702+ fn poll ( & mut self , circuit : & mut CircuitThread ) -> Option < Result < ( ) , Arc < ControllerError > > > {
16971703 let uuid = self . uuid ( ) ;
16981704
16991705 match replace ( self , Self :: Done ( uuid) ) {
17001706 Self :: Error ( _, error) => Some ( Err ( error) ) ,
17011707 Self :: Waiting ( uuid, join_handle, mut receiver) => match receiver. try_recv ( ) {
17021708 Ok ( result) => {
17031709 join_handle. join ( ) . unwrap ( ) ;
1710+
1711+ let mut last_sync = circuit. controller . last_checkpoint_sync . lock ( ) . unwrap ( ) ;
1712+ * last_sync = LastCheckpoint {
1713+ timestamp : Instant :: now ( ) ,
1714+ id : Some ( uuid) ,
1715+ } ;
1716+
17041717 Some ( result)
17051718 }
17061719 Err ( TryRecvError :: Empty ) => {
@@ -1716,9 +1729,21 @@ impl RunningCheckpointSync {
17161729 }
17171730}
17181731
1719- struct SyncCheckpointRequest {
1720- uuid : uuid:: Uuid ,
1721- cb : SyncCheckpointCallbackFn ,
1732+ enum SyncCheckpointRequest {
1733+ Scheduled ( uuid:: Uuid ) ,
1734+ Requested {
1735+ uuid : uuid:: Uuid ,
1736+ cb : SyncCheckpointCallbackFn ,
1737+ } ,
1738+ }
1739+
1740+ impl SyncCheckpointRequest {
1741+ fn uuid ( & self ) -> uuid:: Uuid {
1742+ match self {
1743+ Self :: Scheduled ( uuid) => * uuid,
1744+ Self :: Requested { uuid, .. } => * uuid,
1745+ }
1746+ }
17221747}
17231748
17241749#[ derive( Clone ) ]
@@ -2136,17 +2161,22 @@ impl CircuitThread {
21362161 output_backpressure_warning = None ;
21372162
21382163 match trigger. trigger (
2139- self . last_checkpoint ( ) . timestamp ,
2164+ self . last_checkpoint ( ) ,
2165+ self . last_checkpoint_sync ( ) ,
21402166 self . replaying ( ) ,
21412167 self . circuit . bootstrap_in_progress ( ) ,
21422168 self . checkpoint_requested ( ) ,
2169+ self . sync_checkpoint_requested ( ) ,
21432170 ) {
21442171 Action :: Step => {
21452172 if !self . step ( ) ? {
21462173 break ;
21472174 }
21482175 }
21492176 Action :: Checkpoint => self . checkpoint_requests . push ( CheckpointRequest :: Scheduled ) ,
2177+ Action :: SyncCheckpoint ( chk) => self
2178+ . sync_checkpoint_requests
2179+ . push ( SyncCheckpointRequest :: Scheduled ( chk) ) ,
21502180 Action :: Park ( Some ( deadline) ) => self . parker . park_deadline ( deadline) ,
21512181 Action :: Park ( None ) => self . parker . park ( ) ,
21522182 }
@@ -2341,6 +2371,10 @@ impl CircuitThread {
23412371 self . controller . last_checkpoint ( )
23422372 }
23432373
2374+ fn last_checkpoint_sync ( & self ) -> LastCheckpoint {
2375+ self . controller . last_checkpoint_sync ( )
2376+ }
2377+
23442378 fn update_last_checkpoint ( & self , result : & Result < Checkpoint , ControllerError > ) {
23452379 * self . controller . last_checkpoint . lock ( ) . unwrap ( ) = LastCheckpoint {
23462380 timestamp : Instant :: now ( ) ,
@@ -2437,10 +2471,11 @@ impl CircuitThread {
24372471 . push ( CheckpointRequest :: SuspendCommand ( reply_callback) ) ;
24382472 }
24392473 Command :: SyncCheckpoint ( ( uuid, reply_callback) ) => {
2440- self . sync_checkpoint_requests . push ( SyncCheckpointRequest {
2441- uuid,
2442- cb : reply_callback,
2443- } ) ;
2474+ self . sync_checkpoint_requests
2475+ . push ( SyncCheckpointRequest :: Requested {
2476+ uuid,
2477+ cb : reply_callback,
2478+ } ) ;
24442479 }
24452480 }
24462481 }
@@ -2803,8 +2838,10 @@ impl CircuitThread {
28032838 uuid : uuid:: Uuid ,
28042839 result : Result < ( ) , Arc < ControllerError > > ,
28052840 ) {
2806- for request in requests. extract_if ( .., |x| x. uuid == uuid) {
2807- ( request. cb ) ( result. clone ( ) ) ;
2841+ for request in requests. extract_if ( .., |x| x. uuid ( ) == uuid) {
2842+ if let SyncCheckpointRequest :: Requested { cb, .. } = request {
2843+ ( cb) ( result. clone ( ) ) ;
2844+ }
28082845 }
28092846 }
28102847
@@ -2814,7 +2851,7 @@ impl CircuitThread {
28142851
28152852 match running_sync {
28162853 RunningCheckpointSync :: Waiting ( uuid, _, _) => {
2817- let Some ( result) = running_sync. poll ( ) else {
2854+ let Some ( result) = running_sync. poll ( self ) else {
28182855 self . running_checkpoint_sync = Some ( running_sync) ;
28192856 return ;
28202857 } ;
@@ -2836,7 +2873,7 @@ impl CircuitThread {
28362873 } ;
28372874
28382875 if self . running_checkpoint_sync . is_none ( ) {
2839- self . running_checkpoint_sync = Some ( RunningCheckpointSync :: new ( self , req. uuid ) ) ;
2876+ self . running_checkpoint_sync = Some ( RunningCheckpointSync :: new ( self , req. uuid ( ) ) ) ;
28402877 }
28412878 }
28422879}
@@ -3168,6 +3205,9 @@ struct StepTrigger {
31683205 /// Time between automatic checkpoints.
31693206 checkpoint_interval : Option < Duration > ,
31703207
3208+ /// Time between automatic checkpoint syncs.
3209+ sync_interval : Option < Duration > ,
3210+
31713211 /// The circuit is bootstrapping. Used to detect the transition from bootstrapping
31723212 /// to normal mode.
31733213 bootstrapping : bool ,
@@ -3184,6 +3224,9 @@ enum Action {
31843224
31853225 /// Step the circuit.
31863226 Step ,
3227+
3228+ /// Synchronize a checkpoint to object storage.
3229+ SyncCheckpoint ( uuid:: Uuid ) ,
31873230}
31883231
31893232impl StepTrigger {
@@ -3193,19 +3236,30 @@ impl StepTrigger {
31933236 let max_buffering_delay = Duration :: from_micros ( config. max_buffering_delay_usecs ) ;
31943237 let min_batch_size_records = config. min_batch_size_records ;
31953238 let checkpoint_interval = config. fault_tolerance . checkpoint_interval ( ) ;
3239+ let sync_interval = config. storage . as_ref ( ) . and_then ( |s| match & s. backend {
3240+ StorageBackendConfig :: File ( file) => file
3241+ . sync
3242+ . as_ref ( )
3243+ . and_then ( |s| s. push_interval )
3244+ . map ( Duration :: from_secs) ,
3245+ _ => None ,
3246+ } ) ;
3247+
31963248 Self {
31973249 controller,
31983250 buffer_timeout : None ,
31993251 max_buffering_delay,
32003252 min_batch_size_records,
32013253 checkpoint_interval,
32023254 bootstrapping,
3255+ sync_interval,
32033256 }
32043257 }
32053258
32063259 /// Determines when to trigger the next step, given:
32073260 ///
3208- /// - The time of the last checkpoint.
3261+ /// - The metadata about the last checkpoint.
3262+ /// - The metadata about the last sync checkpoint.
32093263 /// - Whether we're currently `replaying`.
32103264 /// - Whether the pipeline is currently `bootstrapping`.
32113265 /// - Whether the pipeline is currently `running`.
@@ -3214,10 +3268,12 @@ impl StepTrigger {
32143268 /// Returns the action for the controller to take.
32153269 fn trigger (
32163270 & mut self ,
3217- last_checkpoint : Instant ,
3271+ last_checkpoint : LastCheckpoint ,
3272+ last_sync : LastCheckpoint ,
32183273 replaying : bool ,
32193274 bootstrapping : bool ,
32203275 checkpoint_requested : bool ,
3276+ sync_checkpoint_requested : bool ,
32213277 ) -> Action {
32223278 // If any input endpoints are blocking suspend, then those are the only
32233279 // ones that we count; otherwise, count all of them.
@@ -3238,7 +3294,12 @@ impl StepTrigger {
32383294 // Time of the next checkpoint.
32393295 let checkpoint = self
32403296 . checkpoint_interval
3241- . map ( |interval| last_checkpoint + interval) ;
3297+ . map ( |interval| last_checkpoint. timestamp + interval) ;
3298+
3299+ // Time of the next checkpoint sync.
3300+ let sync_checkpoint = self
3301+ . sync_interval
3302+ . map ( |interval| last_sync. timestamp + interval) ;
32423303
32433304 // Used to force a step regardless of input
32443305 let committing = self . controller . transaction_commit_requested ( ) ;
@@ -3258,6 +3319,13 @@ impl StepTrigger {
32583319 step ( self )
32593320 } else if checkpoint. is_some_and ( |t| now >= t) && !checkpoint_requested {
32603321 Action :: Checkpoint
3322+ } else if sync_checkpoint. is_some_and ( |t| now >= t)
3323+ && !sync_checkpoint_requested
3324+ && let Some ( chk) = last_checkpoint. id
3325+ && !chk. is_nil ( )
3326+ && Some ( chk) != last_sync. id
3327+ {
3328+ Action :: SyncCheckpoint ( chk)
32613329 } else if self . controller . status . unset_step_requested ( )
32623330 || buffered_records > self . min_batch_size_records
32633331 || self . buffer_timeout . is_some_and ( |t| now >= t)
@@ -4284,6 +4352,7 @@ impl TransactionInfo {
42844352pub struct ControllerInner {
42854353 pub status : Arc < ControllerStatus > ,
42864354 last_checkpoint : Mutex < LastCheckpoint > ,
4355+ last_checkpoint_sync : Mutex < LastCheckpoint > ,
42874356 secrets_dir : PathBuf ,
42884357 num_api_connections : AtomicU64 ,
42894358 command_sender : Sender < Command > ,
@@ -4354,6 +4423,7 @@ impl ControllerInner {
43544423 command_sender,
43554424 catalog : Arc :: new ( catalog) ,
43564425 last_checkpoint : Default :: default ( ) ,
4426+ last_checkpoint_sync : Default :: default ( ) ,
43574427 lir,
43584428 trace_snapshot : Arc :: new ( TokioMutex :: new ( BTreeMap :: new ( ) ) ) ,
43594429 next_input_id : Atomic :: new ( 0 ) ,
@@ -4451,6 +4521,10 @@ impl ControllerInner {
44514521 self . last_checkpoint . lock ( ) . unwrap ( ) . clone ( )
44524522 }
44534523
4524+ fn last_checkpoint_sync ( & self ) -> LastCheckpoint {
4525+ self . last_checkpoint_sync . lock ( ) . unwrap ( ) . clone ( )
4526+ }
4527+
44544528 fn get_transaction_number ( & self ) -> u64 {
44554529 self . transaction_number . load ( Ordering :: Acquire )
44564530 }
0 commit comments