Skip to content

Commit 75e4e73

Browse files
committed
pipeline-manager: storage status details
This makes the following three main changes. 1. (New) Details about the storage status (notably, the checkpoints) are added as a pipeline field (`storage_status_details`). It provides useful information specifically about the storage, and continues to be available even after the pipeline has stopped (unlike runtime status details). It is cleared when storage is cleared. The `suspend_info` field is removed, as it is now obsolete and was never user-facing. 2. (Fix) Dedicated error `BootstrapPolicyImmutableUnlessStopped` for when the bootstrap policy is changed by again calling `/start` with a different bootstrap policy. 3. (Refactor) Parsing of rows is refactored to improve reusability. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 342b7af commit 75e4e73

28 files changed

Lines changed: 1858 additions & 891 deletions

crates/adapters/src/server.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use feldera_types::query_params::{
6969
};
7070
use feldera_types::runtime_status::{
7171
BootstrapPolicy, ExtendedRuntimeStatus, ExtendedRuntimeStatusError, RuntimeDesiredStatus,
72-
RuntimeStatus,
72+
RuntimeStatus, StorageStatusDetails,
7373
};
7474
use feldera_types::suspend::{SuspendError, SuspendableResponse};
7575
use feldera_types::time_series::TimeSeries;
@@ -1354,13 +1354,33 @@ async fn status_handler(
13541354

13551355
#[allow(clippy::result_large_err)]
13561356
fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRuntimeStatusError> {
1357+
// Runtime desired status
13571358
let runtime_desired_status = state.desired_status();
1359+
1360+
// Storage status details
1361+
let storage_status_details = match &state.storage {
1362+
Some(backend) => match Checkpointer::read_checkpoints(&**backend) {
1363+
Ok(list_checkpoints) => Some(StorageStatusDetails {
1364+
checkpoints: list_checkpoints,
1365+
}),
1366+
Err(e) => {
1367+
error!(
1368+
"Unable to read checkpoints; storage status details are not provided. Error: {e}"
1369+
);
1370+
None
1371+
}
1372+
},
1373+
None => None,
1374+
};
1375+
1376+
// Current status
13581377
match state.controller() {
13591378
Ok(controller) => {
13601379
fn inner_status(
13611380
runtime_desired_status: RuntimeDesiredStatus,
13621381
controller: &Controller,
13631382
default_status: RuntimeStatus,
1383+
storage_status_details: Option<StorageStatusDetails>,
13641384
) -> ExtendedRuntimeStatus {
13651385
ExtendedRuntimeStatus {
13661386
runtime_status: if controller.status().bootstrap_in_progress() {
@@ -1372,6 +1392,7 @@ fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRunt
13721392
},
13731393
runtime_status_details: json!(""),
13741394
runtime_desired_status,
1395+
storage_status_details,
13751396
}
13761397
}
13771398

@@ -1380,11 +1401,13 @@ fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRunt
13801401
runtime_desired_status,
13811402
&controller,
13821403
RuntimeStatus::Paused,
1404+
storage_status_details,
13831405
)),
13841406
PipelineState::Running => Ok(inner_status(
13851407
runtime_desired_status,
13861408
&controller,
13871409
RuntimeStatus::Running,
1410+
storage_status_details,
13881411
)),
13891412
PipelineState::Terminated => Err(ExtendedRuntimeStatusError {
13901413
status_code: StatusCode::INTERNAL_SERVER_ERROR,
@@ -1408,21 +1431,25 @@ fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRunt
14081431
runtime_status: RuntimeStatus::Initializing,
14091432
runtime_status_details: json!(""),
14101433
runtime_desired_status,
1434+
storage_status_details,
14111435
}),
14121436
InitializationState::DownloadingCheckpoint => Ok(ExtendedRuntimeStatus {
14131437
runtime_status: RuntimeStatus::Initializing,
14141438
runtime_status_details: json!("downloading checkpoint from object storage"),
14151439
runtime_desired_status,
1440+
storage_status_details,
14161441
}),
14171442
InitializationState::Standby => Ok(ExtendedRuntimeStatus {
14181443
runtime_status: RuntimeStatus::Standby,
14191444
runtime_status_details: json!(""),
14201445
runtime_desired_status,
1446+
storage_status_details,
14211447
}),
14221448
InitializationState::AwaitingApproval(diff) => Ok(ExtendedRuntimeStatus {
14231449
runtime_status: RuntimeStatus::AwaitingApproval,
14241450
runtime_status_details: serde_json::to_value(&diff).unwrap_or_default(),
14251451
runtime_desired_status,
1452+
storage_status_details,
14261453
}),
14271454
},
14281455
PipelinePhase::InitializationError(e) => {
@@ -1454,6 +1481,7 @@ fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRunt
14541481
runtime_status: RuntimeStatus::Replaying,
14551482
runtime_status_details: json!(""),
14561483
runtime_desired_status,
1484+
storage_status_details,
14571485
})
14581486
} else {
14591487
let mut status_code = e.status_code();
@@ -1480,6 +1508,7 @@ fn get_status(state: &ServerState) -> Result<ExtendedRuntimeStatus, ExtendedRunt
14801508
runtime_status: RuntimeStatus::Suspended,
14811509
runtime_status_details: json!(""),
14821510
runtime_desired_status,
1511+
storage_status_details,
14831512
}),
14841513
}
14851514
}

crates/feldera-types/src/checkpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub struct CheckpointSyncFailure {
110110

111111
/// Holds meta-data about a checkpoint that was taken for persistent storage
112112
/// and recovery of a circuit's state.
113-
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
113+
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
114114
pub struct CheckpointMetadata {
115115
/// A unique identifier for the given checkpoint.
116116
///

crates/feldera-types/src/runtime_status.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use crate::checkpoint::CheckpointMetadata;
12
use crate::error::ErrorResponse;
23
use actix_web::body::BoxBody;
34
use actix_web::http::StatusCode;
45
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
56
use bytemuck::NoUninit;
67
use clap::ValueEnum;
78
use serde::{Deserialize, Serialize};
9+
use std::collections::VecDeque;
810
use std::fmt;
911
use std::fmt::Display;
1012
use utoipa::ToSchema;
@@ -14,6 +16,7 @@ use utoipa::ToSchema;
1416
/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
1517
/// determined by the pipeline and taken over by the runner.
1618
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema, NoUninit)]
19+
#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
1720
#[repr(u8)]
1821
pub enum RuntimeStatus {
1922
/// The runner was unable to determine the pipeline runtime status. This status is never
@@ -74,6 +77,7 @@ impl From<RuntimeDesiredStatus> for RuntimeStatus {
7477
}
7578

7679
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, ValueEnum)]
80+
#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
7781
pub enum RuntimeDesiredStatus {
7882
Unavailable,
7983
Coordination,
@@ -219,6 +223,14 @@ impl Display for BootstrapPolicy {
219223
}
220224
}
221225

226+
/// Details about pipeline storage, which are returned as part of the regular runtime status polling
227+
/// by the runner.
228+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
229+
pub struct StorageStatusDetails {
230+
/// Present checkpoints.
231+
pub checkpoints: VecDeque<CheckpointMetadata>,
232+
}
233+
222234
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
223235
pub struct ExtendedRuntimeStatus {
224236
/// Runtime status of the pipeline.
@@ -231,6 +243,13 @@ pub struct ExtendedRuntimeStatus {
231243

232244
/// Runtime desired status of the pipeline.
233245
pub runtime_desired_status: RuntimeDesiredStatus,
246+
247+
/// Details about the pipeline persistent storage.
248+
///
249+
/// `None` indicates that the pipeline in its current runtime status is unable to check the
250+
/// storage status details. Returning `None` _does not_ override the already existing storage
251+
/// status details in the database of the runner.
252+
pub storage_status_details: Option<StorageStatusDetails>,
234253
}
235254

236255
impl Responder for ExtendedRuntimeStatus {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- `storage_status_details` contains a JSON value encoded as a string.
2+
ALTER TABLE pipeline
3+
ADD COLUMN storage_status_details VARCHAR NULL;
4+
5+
-- `suspend_info` has been superseded by `storage_status_details`.
6+
ALTER TABLE pipeline
7+
DROP COLUMN suspend_info;

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,7 @@ pub(crate) async fn get_checkpoint_status(
12461246
request: HttpRequest,
12471247
) -> Result<HttpResponse, ManagerError> {
12481248
let pipeline_name = path.into_inner();
1249-
state
1249+
let result = state
12501250
.runner
12511251
.forward_http_request_to_pipeline_by_name(
12521252
client.as_ref(),
@@ -1258,7 +1258,14 @@ pub(crate) async fn get_checkpoint_status(
12581258
None,
12591259
None,
12601260
)
1261+
.await;
1262+
state
1263+
.db
1264+
.lock()
12611265
.await
1266+
.increment_notify_counter(*tenant_id, &pipeline_name)
1267+
.await?;
1268+
result
12621269
}
12631270

12641271
/// Get Checkpoint Sync Status
@@ -1356,7 +1363,7 @@ pub(crate) async fn get_checkpoints(
13561363
request: HttpRequest,
13571364
) -> Result<HttpResponse, ManagerError> {
13581365
let pipeline_name = path.into_inner();
1359-
state
1366+
let result = state
13601367
.runner
13611368
.forward_http_request_to_pipeline_by_name(
13621369
client.as_ref(),
@@ -1368,7 +1375,14 @@ pub(crate) async fn get_checkpoints(
13681375
None,
13691376
None,
13701377
)
1378+
.await;
1379+
state
1380+
.db
1381+
.lock()
13711382
.await
1383+
.increment_notify_counter(*tenant_id, &pipeline_name)
1384+
.await?;
1385+
result
13721386
}
13731387

13741388
/// Start a Samply profile

crates/pipeline-manager/src/api/endpoints/pipeline_management.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub struct PipelineInfo {
108108
pub deployment_error: Option<ErrorResponse>,
109109
pub refresh_version: Version,
110110
pub storage_status: StorageStatus,
111+
pub storage_status_details: Option<serde_json::Value>,
111112
pub deployment_id: Option<Uuid>,
112113
pub deployment_initial: Option<RuntimeDesiredStatus>,
113114
pub deployment_status: CombinedStatus,
@@ -156,6 +157,7 @@ pub struct PipelineInfoInternal {
156157
pub deployment_error: Option<ErrorResponse>,
157158
pub refresh_version: Version,
158159
pub storage_status: StorageStatus,
160+
pub storage_status_details: Option<serde_json::Value>,
159161
pub deployment_id: Option<Uuid>,
160162
pub deployment_initial: Option<RuntimeDesiredStatus>,
161163
pub deployment_status: CombinedStatus,
@@ -196,6 +198,7 @@ impl PipelineInfoInternal {
196198
deployment_error: extended_pipeline.deployment_error,
197199
refresh_version: extended_pipeline.refresh_version,
198200
storage_status: extended_pipeline.storage_status,
201+
storage_status_details: extended_pipeline.storage_status_details,
199202
deployment_id: extended_pipeline.deployment_id,
200203
deployment_initial: extended_pipeline.deployment_initial,
201204
deployment_status: CombinedStatus::new(
@@ -264,6 +267,7 @@ pub struct PipelineSelectedInfo {
264267
pub deployment_error: Option<ErrorResponse>,
265268
pub refresh_version: Version,
266269
pub storage_status: StorageStatus,
270+
pub storage_status_details: Option<serde_json::Value>,
267271
pub deployment_id: Option<Uuid>,
268272
pub deployment_initial: Option<RuntimeDesiredStatus>,
269273
pub deployment_status: CombinedStatus,
@@ -320,6 +324,7 @@ pub struct PipelineSelectedInfoInternal {
320324
pub deployment_error: Option<ErrorResponse>,
321325
pub refresh_version: Version,
322326
pub storage_status: StorageStatus,
327+
pub storage_status_details: Option<serde_json::Value>,
323328
pub deployment_id: Option<Uuid>,
324329
pub deployment_initial: Option<RuntimeDesiredStatus>,
325330
pub deployment_status: CombinedStatus,
@@ -366,6 +371,7 @@ impl PipelineSelectedInfoInternal {
366371
deployment_error: extended_pipeline.deployment_error,
367372
refresh_version: extended_pipeline.refresh_version,
368373
storage_status: extended_pipeline.storage_status,
374+
storage_status_details: extended_pipeline.storage_status_details,
369375
deployment_id: extended_pipeline.deployment_id,
370376
deployment_initial: extended_pipeline.deployment_initial,
371377
deployment_status: CombinedStatus::new(
@@ -426,6 +432,7 @@ impl PipelineSelectedInfoInternal {
426432
deployment_error: extended_pipeline.deployment_error,
427433
refresh_version: extended_pipeline.refresh_version,
428434
storage_status: extended_pipeline.storage_status,
435+
storage_status_details: extended_pipeline.storage_status_details,
429436
deployment_id: extended_pipeline.deployment_id,
430437
deployment_initial: extended_pipeline.deployment_initial,
431438
deployment_status: CombinedStatus::new(
@@ -498,6 +505,7 @@ pub enum PipelineFieldSelector {
498505
/// - `deployment_error`
499506
/// - `refresh_version`
500507
/// - `storage_status`
508+
/// - `storage_status_details`
501509
/// - `deployment_id`
502510
/// - `deployment_initial`
503511
/// - `deployment_status`
@@ -531,6 +539,7 @@ pub enum PipelineFieldSelector {
531539
/// - `deployment_error`
532540
/// - `refresh_version`
533541
/// - `storage_status`
542+
/// - `storage_status_details`
534543
/// - `deployment_id`
535544
/// - `deployment_initial`
536545
/// - `deployment_status`

crates/pipeline-manager/src/api/examples.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
6666
deployment_location: None,
6767
deployment_error: None,
6868
refresh_version: Version(4),
69-
suspend_info: None,
7069
storage_status: StorageStatus::Cleared,
70+
storage_status_details: None,
7171
deployment_id: None,
7272
deployment_initial: None,
7373
bootstrap_policy: None,
@@ -153,8 +153,8 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
153153
deployment_location: None,
154154
deployment_error: None,
155155
refresh_version: Version(1),
156-
suspend_info: None,
157156
storage_status: StorageStatus::Cleared,
157+
storage_status_details: None,
158158
deployment_id: None,
159159
deployment_initial: None,
160160
bootstrap_policy: None,
@@ -206,6 +206,7 @@ fn pipeline_info_internal_to_external(pipeline: PipelineInfoInternal) -> Pipelin
206206
deployment_error: pipeline.deployment_error,
207207
refresh_version: pipeline.refresh_version,
208208
storage_status: pipeline.storage_status,
209+
storage_status_details: pipeline.storage_status_details,
209210
deployment_id: pipeline.deployment_id,
210211
deployment_initial: pipeline.deployment_initial,
211212
deployment_status: pipeline.deployment_status,
@@ -267,6 +268,7 @@ fn pipeline_selected_info_internal_to_external(
267268
deployment_error: pipeline.deployment_error,
268269
refresh_version: pipeline.refresh_version,
269270
storage_status: pipeline.storage_status,
271+
storage_status_details: pipeline.storage_status_details,
270272
deployment_id: pipeline.deployment_id,
271273
deployment_initial: pipeline.deployment_initial,
272274
deployment_status: pipeline.deployment_status,

0 commit comments

Comments
 (0)