Skip to content

Commit 030ef3a

Browse files
committed
update based on state rework
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent b48b578 commit 030ef3a

File tree

5 files changed

+231
-75
lines changed

5 files changed

+231
-75
lines changed

crates/pipeline-manager/migrations/V28__add_pipeline_lifecycle_history.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ CREATE TABLE IF NOT EXISTS pipeline_lifecycle_events (
22
event_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
33
pipeline_id uuid NOT NULL,
44
tenant_id uuid NOT NULL,
5-
deployment_status varchar NOT NULL,
5+
deployment_resources_status varchar NOT NULL,
6+
deployment_runtime_status varchar,
7+
deployment_runtime_desired_status varchar,
68
info text,
79
recorded_at TIMESTAMP NOT NULL DEFAULT now(),
810
FOREIGN KEY (pipeline_id) REFERENCES pipeline(id) ON DELETE CASCADE,

crates/pipeline-manager/src/api/lifecycle_events.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@ use serde::{Deserialize, Serialize};
33
use utoipa::ToSchema;
44
use uuid::Uuid;
55

6-
use crate::db::types::pipeline::PipelineStatus;
7-
86
// Pipeline Lifecycle Events
97
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
108
pub struct PipelineLifecycleEvent {
119
pub event_id: Uuid,
12-
// pub tenant_id: TenantId,
13-
// pub pipeline_id: PipelineId,
14-
pub deployment_status: PipelineStatus,
10+
pub deployment_resources_status: String,
11+
pub deployment_runtime_status: Option<String>,
12+
pub deployment_runtime_desired_status: Option<String>,
1513
pub info: Option<String>,
1614
pub recorded_at: NaiveDateTime,
1715
}

crates/pipeline-manager/src/db/operations/pipeline.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,6 +1326,10 @@ pub(crate) async fn set_deployment_resources_status(
13261326
Some(ref v) => Some(serialize_error_response(v)?),
13271327
};
13281328

1329+
let deployment_resources_status = new_deployment_resources_status.to_string(); // $7: deployment_resources_status,
1330+
let deployment_runtime_status = final_deployment_runtime_status.map(runtime_status_to_string); // $8: deployment_runtime_status,
1331+
let deployment_runtime_desired_status = final_deployment_runtime_desired_status.map(runtime_desired_status_to_string); // $9: deployment_runtime_desired_status,
1332+
13291333
// Execute query
13301334
let stmt = txn
13311335
.prepare_cached(
@@ -1349,18 +1353,15 @@ pub(crate) async fn set_deployment_resources_status(
13491353
.execute(
13501354
&stmt,
13511355
&[
1352-
&match final_deployment_error {
1353-
None => None,
1354-
Some(v) => Some(serialize_error_response(&v)?),
1355-
}, // $1: deployment_error
1356+
&final_deployment_error, // $1: deployment_error
13561357
&final_deployment_config.map(|v| v.to_string()), // $2: deployment_config
13571358
&final_deployment_location, // $3: deployment_location
13581359
&final_suspend_info.map(|v| v.to_string()), // $4: suspend_info
13591360
&final_deployment_id, // $5: deployment_id
13601361
&final_deployment_initial.map(runtime_desired_status_to_string), // $6: deployment_initial
1361-
&new_deployment_resources_status.to_string(), // $7: deployment_resources_status,
1362-
&final_deployment_runtime_status.map(runtime_status_to_string), // $8: deployment_runtime_status,
1363-
&final_deployment_runtime_desired_status.map(runtime_desired_status_to_string), // $9: deployment_runtime_desired_status,
1362+
&deployment_resources_status, // $7: deployment_resources_status,
1363+
&deployment_runtime_status, // $8: deployment_runtime_status,
1364+
&deployment_runtime_desired_status, // $9: deployment_runtime_desired_status,
13641365
&tenant_id.0, // $10: tenant_id
13651366
&pipeline_id.0, // $11: id
13661367
],
@@ -1373,7 +1374,9 @@ pub(crate) async fn set_deployment_resources_status(
13731374
txn,
13741375
tenant_id,
13751376
pipeline_id,
1376-
new_deployment_status,
1377+
&deployment_resources_status, // $7: deployment_resources_status,
1378+
&deployment_runtime_status, // $8: deployment_runtime_status,
1379+
&deployment_runtime_desired_status, // $9: deployment_runtime_desired_status,
13771380
&final_deployment_error,
13781381
)
13791382
.await?;
@@ -1692,13 +1695,15 @@ pub(crate) async fn store_pipeline_lifecycle_event(
16921695
transaction: &Transaction<'_>,
16931696
tenant_id: TenantId,
16941697
pipeline_id: PipelineId,
1695-
deployment_status: PipelineStatus,
1698+
deployment_resources_status: &str,
1699+
deployment_runtime_status: &Option<String>,
1700+
deployment_runtime_desired_status: &Option<String>,
16961701
info: &Option<impl AsRef<str>>,
16971702
) -> Result<(), DBError> {
16981703
let query = r#"
16991704
INSERT INTO pipeline_lifecycle_events
1700-
(tenant_id, pipeline_id, deployment_status, info)
1701-
VALUES ($1, $2, $3, $4)
1705+
(tenant_id, pipeline_id, deployment_resources_status, deployment_runtime_status, deployment_runtime_desired_status, info)
1706+
VALUES ($1, $2, $3, $4, $5, $6)
17021707
"#;
17031708

17041709
transaction
@@ -1707,7 +1712,9 @@ pub(crate) async fn store_pipeline_lifecycle_event(
17071712
&[
17081713
&tenant_id.0,
17091714
&pipeline_id.0,
1710-
&deployment_status.to_string(),
1715+
&deployment_resources_status,
1716+
&deployment_runtime_status,
1717+
&deployment_runtime_desired_status,
17111718
&info.as_ref().map(|v| v.as_ref()),
17121719
],
17131720
)
@@ -1724,7 +1731,7 @@ pub(crate) async fn get_pipeline_lifecycle_events(
17241731
limit: u32,
17251732
) -> Result<Vec<PipelineLifecycleEvent>, DBError> {
17261733
let stmt = r#"
1727-
SELECT p.event_id, p.deployment_status, p.info, p.recorded_at
1734+
SELECT p.event_id, p.deployment_resources_status, p.deployment_runtime_status, p.deployment_runtime_desired_status, p.info, p.recorded_at
17281735
FROM pipeline_lifecycle_events as p
17291736
WHERE p.tenant_id = $1 AND p.pipeline_id = $2
17301737
ORDER BY p.recorded_at ASC
@@ -1739,18 +1746,13 @@ pub(crate) async fn get_pipeline_lifecycle_events(
17391746

17401747
let ret: Vec<PipelineLifecycleEvent> = rows
17411748
.into_iter()
1742-
.filter_map(|row| {
1743-
if let Ok(status) = row.get::<_, String>(1).try_into() {
1744-
let event = PipelineLifecycleEvent {
1745-
event_id: row.get(0),
1746-
deployment_status: status,
1747-
info: row.get(2),
1748-
recorded_at: row.get(3),
1749-
};
1750-
Some(event)
1751-
} else {
1752-
None
1753-
}
1749+
.map(|row| PipelineLifecycleEvent {
1750+
event_id: row.get(0),
1751+
deployment_resources_status: row.get(1),
1752+
deployment_runtime_status: row.get(2),
1753+
deployment_runtime_desired_status: row.get(3),
1754+
info: row.get(4),
1755+
recorded_at: row.get(5),
17541756
})
17551757
.collect();
17561758
Ok(ret)

crates/pipeline-manager/src/db/storage_postgres.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use deadpool_postgres::{Manager, Pool, RecyclingMethod};
2727
use feldera_types::config::{PipelineConfig, RuntimeConfig};
2828
use feldera_types::error::ErrorResponse;
29+
use feldera_types::runtime_status::{ExtendedRuntimeStatus, RuntimeDesiredStatus};
2930
use log::{debug, info, log, warn, Level};
3031
use tokio_postgres::Row;
3132
use uuid::Uuid;

0 commit comments

Comments
 (0)