@@ -26,6 +26,7 @@ use colored::{ColoredString, Colorize};
2626use dbsp:: { circuit:: CircuitConfig , DBSPHandle } ;
2727use dbsp:: { RootCircuit , Runtime } ;
2828use dyn_clone:: DynClone ;
29+ use feldera_types:: checkpoint:: CheckpointStatus ;
2930use feldera_types:: completion_token:: {
3031 CompletionStatusArgs , CompletionStatusResponse , CompletionTokenResponse ,
3132} ;
@@ -113,11 +114,52 @@ fn missing_controller_error(state: &ServerState) -> PipelineError {
113114 }
114115}
115116
117+ #[ derive( Clone , Debug , Default ) ]
118+ struct CheckpointState {
119+ /// Sequence number for the next checkpoint request.
120+ next_seq : u64 ,
121+
122+ /// Status to report to user.
123+ status : CheckpointStatus ,
124+ }
125+
126+ impl CheckpointState {
127+ /// Returns the sequence number to use for the next checkpoint request.
128+ fn next_seq ( & mut self ) -> u64 {
129+ let seq = self . next_seq ;
130+ self . next_seq += 1 ;
131+ seq
132+ }
133+
134+ /// Updates the state for completion of the checkpoint request with sequence
135+ /// number `seq` with status `result`.
136+ fn completed ( & mut self , seq : u64 , result : Result < ( ) , Arc < ControllerError > > ) {
137+ match result {
138+ Ok ( ( ) ) => {
139+ if self . status . success . is_none_or ( |success| success < seq) {
140+ self . status . success = Some ( seq) ;
141+ }
142+ }
143+ Err ( error) => {
144+ if self
145+ . status
146+ . failure
147+ . as_ref ( )
148+ . is_none_or ( |( failure, _) | * failure < seq)
149+ {
150+ self . status . failure = Some ( ( seq, error. to_string ( ) ) ) ;
151+ }
152+ }
153+ }
154+ }
155+ }
156+
116157struct ServerState {
117158 phase : RwLock < PipelinePhase > ,
118159 metadata : RwLock < String > ,
119160 controller : Mutex < Option < Controller > > ,
120161 prometheus : RwLock < Option < PrometheusMetrics > > ,
162+ checkpoint_state : Arc < Mutex < CheckpointState > > ,
121163 /// Channel used to send a `kill` command to
122164 /// the self-destruct task when shutting down
123165 /// the server.
@@ -130,6 +172,7 @@ impl ServerState {
130172 phase : RwLock :: new ( PipelinePhase :: Initializing ) ,
131173 metadata : RwLock :: new ( String :: new ( ) ) ,
132174 controller : Mutex :: new ( None ) ,
175+ checkpoint_state : Arc :: new ( Mutex :: new ( CheckpointState :: default ( ) ) ) ,
133176 prometheus : RwLock :: new ( None ) ,
134177 terminate_sender,
135178 }
@@ -576,6 +619,7 @@ where
576619 . service ( dump_profile)
577620 . service ( lir)
578621 . service ( checkpoint)
622+ . service ( checkpoint_status)
579623 . service ( suspend)
580624 . service ( input_endpoint)
581625 . service ( output_endpoint)
@@ -888,30 +932,27 @@ async fn lir(state: WebData<ServerState>) -> impl Responder {
888932 . body ( lir. as_zip ( ) ) )
889933}
890934
935+ /// Initiates a checkpoint and returns its sequence number. The caller may poll
936+ /// `/checkpoint_status` to determine when the checkpoint completes.
891937#[ post( "/checkpoint" ) ]
892938async fn checkpoint ( state : WebData < ServerState > ) -> impl Responder {
893- #[ cfg( feature = "feldera-enterprise" ) ]
894- {
895- let ( sender, receiver) = oneshot:: channel ( ) ;
896- match & * state. controller . lock ( ) . unwrap ( ) {
897- None => return Err ( missing_controller_error ( & state) ) ,
898- Some ( controller) => {
899- controller. start_checkpoint ( Box :: new ( move |checkpoint| {
900- if sender. send ( checkpoint. map ( |_| ( ) ) ) . is_err ( ) {
901- error ! ( "`/checkpoint` result could not be sent" ) ;
902- }
903- } ) ) ;
904- }
905- } ;
906- receiver. await . unwrap ( ) ?;
907- Ok ( HttpResponse :: Ok ( ) . json ( "Checkpoint completed" ) )
908- }
939+ let seq = match & * state. controller . lock ( ) . unwrap ( ) {
940+ None => return Err ( missing_controller_error ( & state) ) ,
941+ Some ( controller) => {
942+ let state = state. checkpoint_state . clone ( ) ;
943+ let seq = state. lock ( ) . unwrap ( ) . next_seq ( ) ;
944+ controller. start_checkpoint ( Box :: new ( move |result| {
945+ state. lock ( ) . unwrap ( ) . completed ( seq, result. map ( |_| ( ) ) ) ;
946+ } ) ) ;
947+ seq
948+ }
949+ } ;
950+ Ok ( HttpResponse :: Ok ( ) . json ( seq) )
951+ }
909952
910- #[ cfg( not( feature = "feldera-enterprise" ) ) ]
911- {
912- let _ = state;
913- Err :: < & str , _ > ( ControllerError :: EnterpriseFeature ( "checkpoint" ) )
914- }
953+ #[ get( "/checkpoint_status" ) ]
954+ async fn checkpoint_status ( state : WebData < ServerState > ) -> impl Responder {
955+ HttpResponse :: Ok ( ) . json ( state. checkpoint_state . lock ( ) . unwrap ( ) . status . clone ( ) )
915956}
916957
917958/// Suspends the pipeline (but only a later call to `/shutdown` will shut down
0 commit comments