Skip to content

Commit b50523e

Browse files
blpryzhyk
authored andcommitted
[adapters] Make /checkpoint API asynchronous.
A checkpoint usually completes quickly but in some cases it can take a while. In particular, it can take an indefinite amount of time if some input connector needs to advance to a usable breakpoint. This means that having `/checkpoint` be synchronous is impractical. This commit changes `/checkpoint` to just initiate a checkpoint and return an associated sequence number. The client can then poll `/checkpoint_status` to find out when it has completed. Also updates the pipeline-manager to match. Fixes #4022. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 0dd6f5b commit b50523e

5 files changed

Lines changed: 133 additions & 23 deletions

File tree

crates/adapters/src/server/mod.rs

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use colored::{ColoredString, Colorize};
2626
use dbsp::{circuit::CircuitConfig, DBSPHandle};
2727
use dbsp::{RootCircuit, Runtime};
2828
use dyn_clone::DynClone;
29+
use feldera_types::checkpoint::CheckpointStatus;
2930
use feldera_types::completion_token::{
3031
CompletionStatusArgs, CompletionStatusResponse, CompletionTokenResponse,
3132
};
@@ -113,11 +114,52 @@ fn missing_controller_error(state: &ServerState) -> PipelineError {
113114
}
114115
}
115116

117+
#[derive(Clone, Debug, Default)]
118+
struct CheckpointState {
119+
/// Sequence number for the next checkpoint request.
120+
next_seq: u64,
121+
122+
/// Status to report to user.
123+
status: CheckpointStatus,
124+
}
125+
126+
impl CheckpointState {
127+
/// Returns the sequence number to use for the next checkpoint request.
128+
fn next_seq(&mut self) -> u64 {
129+
let seq = self.next_seq;
130+
self.next_seq += 1;
131+
seq
132+
}
133+
134+
/// Updates the state for completion of the checkpoint request with sequence
135+
/// number `seq` with status `result`.
136+
fn completed(&mut self, seq: u64, result: Result<(), Arc<ControllerError>>) {
137+
match result {
138+
Ok(()) => {
139+
if self.status.success.is_none_or(|success| success < seq) {
140+
self.status.success = Some(seq);
141+
}
142+
}
143+
Err(error) => {
144+
if self
145+
.status
146+
.failure
147+
.as_ref()
148+
.is_none_or(|(failure, _)| *failure < seq)
149+
{
150+
self.status.failure = Some((seq, error.to_string()));
151+
}
152+
}
153+
}
154+
}
155+
}
156+
116157
struct ServerState {
117158
phase: RwLock<PipelinePhase>,
118159
metadata: RwLock<String>,
119160
controller: Mutex<Option<Controller>>,
120161
prometheus: RwLock<Option<PrometheusMetrics>>,
162+
checkpoint_state: Arc<Mutex<CheckpointState>>,
121163
/// Channel used to send a `kill` command to
122164
/// the self-destruct task when shutting down
123165
/// the server.
@@ -130,6 +172,7 @@ impl ServerState {
130172
phase: RwLock::new(PipelinePhase::Initializing),
131173
metadata: RwLock::new(String::new()),
132174
controller: Mutex::new(None),
175+
checkpoint_state: Arc::new(Mutex::new(CheckpointState::default())),
133176
prometheus: RwLock::new(None),
134177
terminate_sender,
135178
}
@@ -576,6 +619,7 @@ where
576619
.service(dump_profile)
577620
.service(lir)
578621
.service(checkpoint)
622+
.service(checkpoint_status)
579623
.service(suspend)
580624
.service(input_endpoint)
581625
.service(output_endpoint)
@@ -888,30 +932,27 @@ async fn lir(state: WebData<ServerState>) -> impl Responder {
888932
.body(lir.as_zip()))
889933
}
890934

935+
/// Initiates a checkpoint and returns its sequence number. The caller may poll
936+
/// `/checkpoint_status` to determine when the checkpoint completes.
891937
#[post("/checkpoint")]
892938
async fn checkpoint(state: WebData<ServerState>) -> impl Responder {
893-
#[cfg(feature = "feldera-enterprise")]
894-
{
895-
let (sender, receiver) = oneshot::channel();
896-
match &*state.controller.lock().unwrap() {
897-
None => return Err(missing_controller_error(&state)),
898-
Some(controller) => {
899-
controller.start_checkpoint(Box::new(move |checkpoint| {
900-
if sender.send(checkpoint.map(|_| ())).is_err() {
901-
error!("`/checkpoint` result could not be sent");
902-
}
903-
}));
904-
}
905-
};
906-
receiver.await.unwrap()?;
907-
Ok(HttpResponse::Ok().json("Checkpoint completed"))
908-
}
939+
let seq = match &*state.controller.lock().unwrap() {
940+
None => return Err(missing_controller_error(&state)),
941+
Some(controller) => {
942+
let state = state.checkpoint_state.clone();
943+
let seq = state.lock().unwrap().next_seq();
944+
controller.start_checkpoint(Box::new(move |result| {
945+
state.lock().unwrap().completed(seq, result.map(|_| ()));
946+
}));
947+
seq
948+
}
949+
};
950+
Ok(HttpResponse::Ok().json(seq))
951+
}
909952

910-
#[cfg(not(feature = "feldera-enterprise"))]
911-
{
912-
let _ = state;
913-
Err::<&str, _>(ControllerError::EnterpriseFeature("checkpoint"))
914-
}
953+
#[get("/checkpoint_status")]
954+
async fn checkpoint_status(state: WebData<ServerState>) -> impl Responder {
955+
HttpResponse::Ok().json(state.checkpoint_state.lock().unwrap().status.clone())
915956
}
916957

917958
/// Suspends the pipeline (but only a later call to `/shutdown` will shut down
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use serde::{Deserialize, Serialize};
2+
use utoipa::ToSchema;
3+
4+
/// Checkpoint status returned by the `/checkpoint_status` endpoint.
5+
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
6+
pub struct CheckpointStatus {
7+
/// Most recently successful checkpoint.
8+
pub success: Option<u64>,
9+
10+
/// Most recently failed checkpoint, and the associated error.
11+
pub failure: Option<(u64, String)>,
12+
}

crates/feldera-types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod checkpoint;
12
pub mod completion_token;
23
pub mod config;
34
pub mod error;

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,10 @@ pub(crate) async fn get_pipeline_circuit_profile(
669669
.await
670670
}
671671

672-
/// Checkpoint a running or paused pipeline.
672+
/// Initiates checkpoint for a running or paused pipeline.
673+
///
674+
/// Returns a sequence number that can be used with `/checkpoint_status` to
675+
/// determine when the checkpoint has completed.
673676
#[utoipa::path(
674677
context_path = "/v0",
675678
security(("JSON web token (JWT) or API key" = [])),
@@ -678,7 +681,7 @@ pub(crate) async fn get_pipeline_circuit_profile(
678681
),
679682
responses(
680683
(status = OK
681-
, description = "Checkpoint completed"),
684+
, description = "Checkpoint initiated"),
682685
(status = NOT_FOUND
683686
, description = "Pipeline with that name does not exist"
684687
, body = ErrorResponse
@@ -728,6 +731,58 @@ pub(crate) async fn checkpoint_pipeline(
728731
}
729732
}
730733

734+
/// Retrieve status of checkpoint activity in a pipeline.
735+
#[utoipa::path(
736+
context_path = "/v0",
737+
security(("JSON web token (JWT) or API key" = [])),
738+
params(
739+
("pipeline_name" = String, Path, description = "Unique pipeline name"),
740+
),
741+
responses(
742+
(status = OK
743+
, description = "Checkpoint status retrieved successfully"
744+
, content_type = "application/json"
745+
, body = CheckpointStatus),
746+
(status = NOT_FOUND
747+
, description = "Pipeline with that name does not exist"
748+
, body = ErrorResponse
749+
, example = json!(examples::error_unknown_pipeline_name())),
750+
(status = SERVICE_UNAVAILABLE
751+
, body = ErrorResponse
752+
, examples(
753+
("Pipeline is not deployed" = (value = json!(examples::error_pipeline_interaction_not_deployed()))),
754+
("Pipeline is currently unavailable" = (value = json!(examples::error_pipeline_interaction_currently_unavailable()))),
755+
("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))),
756+
("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout())))
757+
)
758+
),
759+
(status = INTERNAL_SERVER_ERROR, body = ErrorResponse),
760+
),
761+
tag = "Pipeline interaction"
762+
)]
763+
#[get("/pipelines/{pipeline_name}/checkpoint_status")]
764+
pub(crate) async fn get_checkpoint_status(
765+
state: WebData<ServerState>,
766+
client: WebData<awc::Client>,
767+
tenant_id: ReqData<TenantId>,
768+
path: web::Path<String>,
769+
request: HttpRequest,
770+
) -> Result<HttpResponse, ManagerError> {
771+
let pipeline_name = path.into_inner();
772+
state
773+
.runner
774+
.forward_http_request_to_pipeline_by_name(
775+
client.as_ref(),
776+
*tenant_id,
777+
&pipeline_name,
778+
Method::GET,
779+
"checkpoint_status",
780+
request.query_string(),
781+
None,
782+
)
783+
.await
784+
}
785+
731786
/// Suspend a running or paused pipeline.
732787
#[utoipa::path(
733788
context_path = "/v0",

crates/pipeline-manager/src/api/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ fn api_scope() -> Scope {
264264
.service(endpoints::pipeline_interaction::http_input)
265265
.service(endpoints::pipeline_interaction::http_output)
266266
.service(endpoints::pipeline_interaction::checkpoint_pipeline)
267+
.service(endpoints::pipeline_interaction::get_checkpoint_status)
267268
.service(endpoints::pipeline_interaction::suspend_pipeline)
268269
.service(endpoints::pipeline_interaction::post_pipeline_input_connector_action)
269270
.service(endpoints::pipeline_interaction::get_pipeline_input_connector_status)

0 commit comments

Comments
 (0)