Skip to content

Commit 4b0d24b

Browse files
committed
pipeline-manager: fix starting pipeline while clearing storage
Before it was possible to (early) start a pipeline while storage was still clearing, and to clear storage while still awaiting the transition of the resources status during early start. Only once the resources status transitioned, was it no longer possible for storage status to change. Changes: - No longer allow a pipeline to be (early) started when the storage is still clearing, returning an error instead. - No longer allow storage to be cleared during early start, returning an error instead. - Always allow any of the other resources statuses to transition to `Stopping`, even if storage status does not match expectation. The same for transitioning from `Stopping` to `Stopped`. This is a fail-safe for if storage status ends up in an expected status due to an internal error. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent b6384df commit 4b0d24b

File tree

7 files changed

+108
-6
lines changed

7 files changed

+108
-6
lines changed

crates/pipeline-manager/src/db/error.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ pub enum DBError {
175175
},
176176
StorageStatusImmutableUnlessStopped {
177177
resources_status: ResourcesStatus,
178+
resources_desired_status: ResourcesDesiredStatus,
178179
current_status: StorageStatus,
179180
new_status: StorageStatus,
180181
},
@@ -199,6 +200,7 @@ pub enum DBError {
199200
NoRuntimeStatusWhileProvisioned,
200201
PreconditionViolation(String),
201202
CannotStartWithUndismissedError,
203+
CannotStartWhileClearingStorage,
202204
DismissErrorRestrictedToFullyStopped,
203205
InitialImmutableUnlessStopped,
204206
InitialStandbyNotAllowed,
@@ -592,13 +594,14 @@ impl Display for DBError {
592594
}
593595
DBError::StorageStatusImmutableUnlessStopped {
594596
resources_status,
597+
resources_desired_status,
595598
current_status,
596599
new_status,
597600
} => {
598601
write!(
599602
f,
600-
"Cannot transition storage status from '{current_status}' to '{new_status}' with resources status '{resources_status}'. \
601-
Storage status cannot be changed unless the pipeline is stopped."
603+
"Cannot transition storage status from '{current_status}' to '{new_status}' with resources status '{resources_status}' and desired status '{resources_desired_status}'. \
604+
Storage status cannot be changed unless the pipeline is fully stopped."
602605
)
603606
}
604607
DBError::IllegalPipelineAction {
@@ -665,6 +668,13 @@ impl Display for DBError {
665668
first call `/dismiss_error`, after which `/start?dismiss_error=false` is again possible."
666669
)
667670
}
671+
DBError::CannotStartWhileClearingStorage => {
672+
write!(
673+
f,
674+
"Cannot process `/start` if the `storage_status` is `Clearing`. \
675+
Wait for the storage to become cleared before trying again."
676+
)
677+
}
668678
DBError::DismissErrorRestrictedToFullyStopped => {
669679
write!(
670680
f,
@@ -796,6 +806,7 @@ impl DetailedError for DBError {
796806
Self::PreconditionViolation(..) => Cow::from("PreconditionViolation"),
797807
Self::ResumeWhileNotProvisioned => Cow::from("ResumeWhileNotProvisioned"),
798808
Self::CannotStartWithUndismissedError => Cow::from("CannotStartWithUndismissedError"),
809+
Self::CannotStartWhileClearingStorage => Cow::from("CannotStartWhileClearingStorage"),
799810
Self::DismissErrorRestrictedToFullyStopped => {
800811
Cow::from("DismissErrorRestrictedToFullyStopped")
801812
}
@@ -873,6 +884,7 @@ impl ResponseError for DBError {
873884
Self::NoRuntimeStatusWhileProvisioned => StatusCode::INTERNAL_SERVER_ERROR,
874885
Self::PreconditionViolation(..) => StatusCode::INTERNAL_SERVER_ERROR,
875886
Self::CannotStartWithUndismissedError => StatusCode::BAD_REQUEST,
887+
Self::CannotStartWhileClearingStorage => StatusCode::BAD_REQUEST,
876888
Self::DismissErrorRestrictedToFullyStopped => StatusCode::BAD_REQUEST,
877889
Self::InitialImmutableUnlessStopped => StatusCode::BAD_REQUEST,
878890
Self::InitialStandbyNotAllowed => StatusCode::BAD_REQUEST,

crates/pipeline-manager/src/db/operations/pipeline.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,7 @@ pub(crate) async fn set_deployment_resources_desired_status(
11781178

11791179
// Check that the desired status can be set
11801180
validate_resources_desired_status_transition(
1181+
current.storage_status,
11811182
current.deployment_resources_status,
11821183
current.deployment_resources_desired_status,
11831184
final_deployment_error.clone(),
@@ -1738,6 +1739,7 @@ pub(crate) async fn set_storage_status(
17381739
// Check that the transition is permitted
17391740
validate_storage_status_transition(
17401741
current.deployment_resources_status,
1742+
current.deployment_resources_desired_status,
17411743
current.storage_status,
17421744
new_storage_status,
17431745
)?;

crates/pipeline-manager/src/db/test.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,14 @@ async fn pipeline_deployment() {
18931893
)
18941894
.await
18951895
.unwrap();
1896+
assert!(matches!(
1897+
handle
1898+
.db
1899+
.transit_storage_status_to_clearing_if_not_cleared(tenant_id, "example1",)
1900+
.await
1901+
.unwrap_err(),
1902+
DBError::StorageStatusImmutableUnlessStopped { .. }
1903+
));
18961904
handle
18971905
.db
18981906
.transit_deployment_resources_status_to_provisioning(
@@ -1962,6 +1970,30 @@ async fn pipeline_deployment() {
19621970
.transit_deployment_resources_status_to_stopped(tenant_id, pipeline1.id, Version(1))
19631971
.await
19641972
.unwrap();
1973+
handle
1974+
.db
1975+
.transit_storage_status_to_clearing_if_not_cleared(tenant_id, &pipeline1.name)
1976+
.await
1977+
.unwrap();
1978+
assert!(matches!(
1979+
handle
1980+
.db
1981+
.set_deployment_resources_desired_status_provisioned(
1982+
tenant_id,
1983+
"example1",
1984+
RuntimeDesiredStatus::Paused,
1985+
BootstrapPolicy::default(),
1986+
false,
1987+
)
1988+
.await
1989+
.unwrap_err(),
1990+
DBError::CannotStartWhileClearingStorage { .. }
1991+
));
1992+
handle
1993+
.db
1994+
.transit_storage_status_to_cleared(tenant_id, pipeline1.id)
1995+
.await
1996+
.unwrap();
19651997
handle
19661998
.db
19671999
.set_deployment_resources_desired_status_provisioned(
@@ -4256,6 +4288,7 @@ impl Storage for Mutex<DbModel> {
42564288
};
42574289
let new_resources_desired_status = ResourcesDesiredStatus::Provisioned;
42584290
validate_resources_desired_status_transition(
4291+
pipeline.storage_status,
42594292
pipeline.deployment_resources_status,
42604293
pipeline.deployment_resources_desired_status,
42614294
new_deployment_error.clone(),
@@ -4306,6 +4339,7 @@ impl Storage for Mutex<DbModel> {
43064339
if pipeline.deployment_resources_status != ResourcesStatus::Provisioned {
43074340
let new_resources_desired_status = ResourcesDesiredStatus::Stopped;
43084341
validate_resources_desired_status_transition(
4342+
pipeline.storage_status,
43094343
pipeline.deployment_resources_status,
43104344
pipeline.deployment_resources_desired_status,
43114345
pipeline.deployment_error.clone(),
@@ -4340,6 +4374,7 @@ impl Storage for Mutex<DbModel> {
43404374
let mut pipeline = self.get_pipeline(tenant_id, pipeline_name).await?;
43414375
let new_resources_desired_status = ResourcesDesiredStatus::Stopped;
43424376
validate_resources_desired_status_transition(
4377+
pipeline.storage_status,
43434378
pipeline.deployment_resources_status,
43444379
pipeline.deployment_resources_desired_status,
43454380
pipeline.deployment_error.clone(),
@@ -4384,6 +4419,7 @@ impl Storage for Mutex<DbModel> {
43844419
}
43854420
validate_storage_status_transition(
43864421
pipeline.deployment_resources_status,
4422+
pipeline.deployment_resources_desired_status,
43874423
pipeline.storage_status,
43884424
StorageStatus::InUse,
43894425
)?;
@@ -4666,6 +4702,7 @@ impl Storage for Mutex<DbModel> {
46664702
let new_status = StorageStatus::Clearing;
46674703
validate_storage_status_transition(
46684704
pipeline.deployment_resources_status,
4705+
pipeline.deployment_resources_desired_status,
46694706
pipeline.storage_status,
46704707
new_status,
46714708
)?;
@@ -4690,6 +4727,7 @@ impl Storage for Mutex<DbModel> {
46904727
let new_status = StorageStatus::Cleared;
46914728
validate_storage_status_transition(
46924729
pipeline.deployment_resources_status,
4730+
pipeline.deployment_resources_desired_status,
46934731
pipeline.storage_status,
46944732
new_status,
46954733
)?;

crates/pipeline-manager/src/db/types/resources_status.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use feldera_types::error::ErrorResponse;
44
use serde::{Deserialize, Serialize};
55
use std::fmt;
66
use std::fmt::Display;
7+
use tracing::error;
78
use utoipa::ToSchema;
89

910
/// Pipeline resources status.
@@ -174,7 +175,7 @@ pub fn validate_resources_status_transition(
174175
// Check rules on transitioning resources status
175176
if matches!(
176177
(storage_status, current_status, new_status),
177-
(StorageStatus::Cleared | StorageStatus::InUse, ResourcesStatus::Stopped, ResourcesStatus::Provisioning)
178+
(StorageStatus::Cleared | StorageStatus::InUse, ResourcesStatus::Stopped, ResourcesStatus::Provisioning)
178179
| (StorageStatus::Cleared | StorageStatus::InUse, ResourcesStatus::Stopped, ResourcesStatus::Stopping)
179180
| (StorageStatus::InUse, ResourcesStatus::Provisioning, ResourcesStatus::Provisioning)
180181
| (StorageStatus::InUse, ResourcesStatus::Provisioning, ResourcesStatus::Provisioned)
@@ -184,6 +185,16 @@ pub fn validate_resources_status_transition(
184185
| (StorageStatus::Cleared | StorageStatus::InUse, ResourcesStatus::Stopping, ResourcesStatus::Stopped)
185186
) {
186187
Ok(())
188+
} else if matches!(
189+
(storage_status, current_status, new_status),
190+
(_, ResourcesStatus::Stopped | ResourcesStatus::Provisioning | ResourcesStatus::Provisioned, ResourcesStatus::Stopping)
191+
| (_, ResourcesStatus::Stopping, ResourcesStatus::Stopped)
192+
) {
193+
// As a fail-safe, it's always possible to transition from any other status to Stopping and
194+
// from Stopping to Stopped. This however should not occur (instead the first matches should
195+
// have already caught them).
196+
error!("The resources status transition {current_status:?} -> {new_status:?} (storage status: {storage_status:?}) is taking place. This is due to an internal error, and is permitted only in order to recover from an invalid status. Please file a bug report.");
197+
Ok(())
187198
} else {
188199
Err(DBError::InvalidResourcesStatusTransition {
189200
storage_status,
@@ -195,6 +206,7 @@ pub fn validate_resources_status_transition(
195206

196207
/// Validates the resources desired status transition from current status to a new one.
197208
pub fn validate_resources_desired_status_transition(
209+
storage_status: StorageStatus,
198210
status: ResourcesStatus,
199211
current_desired_status: ResourcesDesiredStatus,
200212
new_deployment_error: Option<ErrorResponse>,
@@ -222,6 +234,10 @@ pub fn validate_resources_desired_status_transition(
222234
// If it experienced an error, it needs to be dismissed first
223235
return Err(DBError::CannotStartWithUndismissedError);
224236
}
237+
if storage_status == StorageStatus::Clearing {
238+
// If it is still clearing storage, wait for that to complete
239+
return Err(DBError::CannotStartWhileClearingStorage);
240+
}
225241
Ok(())
226242
}
227243
}

crates/pipeline-manager/src/db/types/storage.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::db::error::DBError;
2-
use crate::db::types::resources_status::ResourcesStatus;
2+
use crate::db::types::resources_status::{ResourcesDesiredStatus, ResourcesStatus};
33
use serde::{Deserialize, Serialize};
44
use std::convert::TryFrom;
55
use std::fmt;
@@ -79,12 +79,35 @@ impl Display for StorageStatus {
7979
/// Validates the storage status transition from current status to a new one.
8080
pub fn validate_storage_status_transition(
8181
resources_status: ResourcesStatus,
82+
resources_desired_status: ResourcesDesiredStatus,
8283
current_status: StorageStatus,
8384
new_status: StorageStatus,
8485
) -> Result<(), DBError> {
8586
if resources_status != ResourcesStatus::Stopped {
8687
return Err(DBError::StorageStatusImmutableUnlessStopped {
8788
resources_status,
89+
resources_desired_status,
90+
current_status,
91+
new_status,
92+
});
93+
}
94+
if resources_desired_status != ResourcesDesiredStatus::Stopped
95+
&& !matches!(
96+
(resources_desired_status, current_status, new_status),
97+
(
98+
ResourcesDesiredStatus::Provisioned,
99+
StorageStatus::Cleared,
100+
StorageStatus::InUse
101+
) | (
102+
ResourcesDesiredStatus::Provisioned,
103+
StorageStatus::InUse,
104+
StorageStatus::InUse
105+
)
106+
)
107+
{
108+
return Err(DBError::StorageStatusImmutableUnlessStopped {
109+
resources_status,
110+
resources_desired_status,
88111
current_status,
89112
new_status,
90113
});

docs.feldera.com/docs/changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import TabItem from '@theme/TabItem';
1414

1515
## Unreleased
1616

17+
Starting a pipeline while storage is still clearing (`storage_status=Clearing`) now returns
18+
`CannotStartWhileClearingStorage` instead of succeeding. Clearing storage while a start
19+
is in progress but hasn't yet transitioned to `Provisioning` now returns
20+
`StorageStatusImmutableUnlessStopped` instead of succeeding.
21+
1722
Backward-incompatible Delta Lake output connector change. The new `max_retries` setting configures
1823
the number of times the connector retries failed Delta Lake operations like writing Parquet files
1924
and committing transactions. The setting is unset by default, causing the connector to retry

python/tests/platform/test_pipeline_lifecycle.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from unittest import skip
21
from feldera.enums import PipelineStatus, ProgramStatus, StorageStatus
32
from feldera.rest.errors import FelderaAPIError
43
import time
@@ -387,8 +386,15 @@ def test_pipeline_clear_using_api(pipeline_name):
387386
pipeline.clear_storage(wait=False)
388387
assert pipeline.storage_status() in [StorageStatus.CLEARING, StorageStatus.CLEARED]
389388

389+
# Start just after might yield an error if it is still clearing
390+
try:
391+
pipeline.start()
392+
except FelderaAPIError as e:
393+
assert e.error_code == "CannotStartWhileClearingStorage"
394+
pipeline.stop(force=True)
395+
pipeline.clear_storage()
396+
390397

391-
@skip # Passing this test requires denying clearing when desired resources status is provisioned.
392398
@gen_pipeline_name
393399
def test_pipeline_clear_while_desired_provisioned(pipeline_name):
394400
"""

0 commit comments

Comments
 (0)