Skip to content

Commit 9d6af95

Browse files
committed
pipeline-manager: track pipeline lifecycle history
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent eb762d8 commit 9d6af95

File tree

6 files changed

+506
-3
lines changed

6 files changed

+506
-3
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE IF NOT EXISTS pipeline_lifecycle_events (
2+
event_id int GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
3+
pipeline_id uuid NOT NULL,
4+
tenant_id uuid NOT NULL,
5+
name varchar NOT NULL,
6+
deployment_status varchar NOT NULL,
7+
info text,
8+
recorded_at TIMESTAMP NOT NULL DEFAULT now(),
9+
FOREIGN KEY (pipeline_id) REFERENCES pipeline(id) ON DELETE CASCADE,
10+
FOREIGN KEY (tenant_id) REFERENCES tenant(id) ON DELETE CASCADE
11+
);

crates/pipeline-manager/src/bin/pipeline-manager.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use clap::{Args, Command, FromArgMatches};
44

55
use colored::Colorize;
6-
use log::info;
6+
use log::{info, warn};
77
use pipeline_manager::api::main::ApiDoc;
88
use pipeline_manager::cluster_health::regular_health_check;
99
use pipeline_manager::compiler::main::{compiler_main, compiler_precompile};
@@ -17,6 +17,7 @@ use pipeline_manager::runner::local_runner::LocalRunner;
1717
use pipeline_manager::runner::main::runner_main;
1818
use pipeline_manager::{ensure_default_crypto_provider, init_fd_limit, platform_enable_unstable};
1919
use std::sync::Arc;
20+
use std::time::Duration;
2021
use tokio::sync::{Mutex, RwLock};
2122
use utoipa::OpenApi;
2223

@@ -88,6 +89,7 @@ async fn main() -> anyhow::Result<()> {
8889

8990
// Run migrations before starting any service
9091
db.run_migrations().await?;
92+
9193
let db = Arc::new(Mutex::new(db));
9294
let db_clone = db.clone();
9395
let common_config_clone = common_config.clone();
@@ -103,6 +105,39 @@ async fn main() -> anyhow::Result<()> {
103105
.await;
104106
});
105107

108+
// TODO: we should get retention period in days from config
109+
let retention_period = "7 days".to_string();
110+
// execute cleanup every 24 hours
111+
let cleanup_interval = Duration::from_secs(60 * 60 * 24);
112+
113+
let db_clone = db.clone();
114+
tokio::spawn({
115+
async move {
116+
let mut interval = tokio::time::interval(cleanup_interval);
117+
118+
loop {
119+
interval.tick().await;
120+
match db_clone
121+
.lock()
122+
.await
123+
.cleanup_pipeline_lifecycle_events(&retention_period)
124+
.await
125+
{
126+
Ok(deleted) => {
127+
info!(
128+
"Pipeline lifecycle cleanup job ran ( retention period: {retention_period} ): deleted {deleted} records."
129+
);
130+
}
131+
Err(e) => {
132+
warn!(
133+
"Pipeline lifecycle cleanup job failed ( retention period: {retention_period} ): {e}."
134+
);
135+
}
136+
}
137+
}
138+
}
139+
});
140+
106141
let health_check = Arc::new(RwLock::new(None));
107142
let health_check_handle = health_check.clone();
108143
let common_config_clone = common_config.clone();

crates/pipeline-manager/src/db/operations/pipeline.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::db::types::utils::{
1919
validate_runtime_config,
2020
};
2121
use crate::db::types::version::Version;
22+
use chrono::NaiveDateTime;
2223
use deadpool_postgres::Transaction;
2324
use feldera_types::error::ErrorResponse;
2425
use log::{error, warn};
@@ -1157,7 +1158,7 @@ pub(crate) async fn set_deployment_status(
11571158
&new_deployment_status.to_string(),
11581159
&match final_deployment_error {
11591160
None => None,
1160-
Some(v) => Some(serialize_error_response(&v)?),
1161+
Some(ref v) => Some(serialize_error_response(v)?),
11611162
},
11621163
&final_deployment_config.map(|v| v.to_string()),
11631164
&final_deployment_location,
@@ -1167,7 +1168,18 @@ pub(crate) async fn set_deployment_status(
11671168
],
11681169
)
11691170
.await?;
1171+
11701172
if rows_affected > 0 {
1173+
// update lifecycle event.
1174+
store_pipeline_lifecycle_event(
1175+
txn,
1176+
tenant_id,
1177+
pipeline_id,
1178+
&current.name,
1179+
new_deployment_status,
1180+
&final_deployment_error,
1181+
)
1182+
.await?;
11711183
Ok(())
11721184
} else {
11731185
Err(DBError::UnknownPipeline { pipeline_id })
@@ -1455,6 +1467,72 @@ pub(crate) async fn get_support_bundle_data(
14551467
Ok(bundles)
14561468
}
14571469

1470+
/// Store a pipeline lifecycle event entry
1471+
pub(crate) async fn store_pipeline_lifecycle_event(
1472+
transaction: &Transaction<'_>,
1473+
tenant_id: TenantId,
1474+
pipeline_id: PipelineId,
1475+
name: &str,
1476+
deployment_status: PipelineStatus,
1477+
info: &Option<ErrorResponse>,
1478+
) -> Result<(), DBError> {
1479+
let query = r#"
1480+
INSERT INTO pipeline_lifecycle_events
1481+
(tenant_id, pipeline_id, name, deployment_status, info)
1482+
VALUES ($1, $2, $3, $4, $5)
1483+
"#;
1484+
1485+
transaction
1486+
.execute(
1487+
query,
1488+
&[
1489+
&tenant_id.0,
1490+
&pipeline_id.0,
1491+
&name,
1492+
&deployment_status.to_string(),
1493+
&match info {
1494+
None => None,
1495+
Some(v) => Some(serialize_error_response(v)?),
1496+
},
1497+
],
1498+
)
1499+
.await?;
1500+
1501+
Ok(())
1502+
}
1503+
1504+
/// Get lifecycle events for a pipeline.
1505+
pub(crate) async fn get_pipeline_lifecycle_events(
1506+
txn: &Transaction<'_>,
1507+
tenant_id: TenantId,
1508+
pipeline_id: PipelineId,
1509+
limit: i64,
1510+
) -> Result<Vec<(PipelineStatus, Option<String>, NaiveDateTime)>, DBError> {
1511+
let stmt = r#"
1512+
SELECT p.deployment_status, p.info, p.recorded_at
1513+
FROM pipeline_lifecycle_events as p
1514+
WHERE p.tenant_id = $1 AND p.pipeline_id = $2
1515+
ORDER BY p.recorded_at ASC
1516+
LIMIT $3
1517+
"#;
1518+
1519+
let rows = txn
1520+
.query(stmt, &[&tenant_id.0, &pipeline_id.0, &limit])
1521+
.await?;
1522+
1523+
let ret: Vec<(PipelineStatus, Option<String>, NaiveDateTime)> = rows
1524+
.into_iter()
1525+
.filter_map(|row| {
1526+
if let Ok(status) = row.get::<_, String>(0).try_into() {
1527+
Some((status, row.get(1), row.get(2)))
1528+
} else {
1529+
None
1530+
}
1531+
})
1532+
.collect();
1533+
Ok(ret)
1534+
}
1535+
14581536
#[cfg(test)]
14591537
mod tests {
14601538
use crate::db::error::DBError;

crates/pipeline-manager/src/db/storage.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use crate::db::error::DBError;
33
use crate::db::types::api_key::{ApiKeyDescr, ApiPermission};
44
use crate::db::types::pipeline::{
55
ExtendedPipelineDescr, ExtendedPipelineDescrMonitoring, PipelineDescr, PipelineId,
6+
PipelineStatus,
67
};
78
use crate::db::types::program::{RustCompilationInfo, SqlCompilationInfo};
89
use crate::db::types::tenant::TenantId;
910
use crate::db::types::version::Version;
1011
use async_trait::async_trait;
12+
use chrono::NaiveDateTime;
1113
use feldera_types::error::ErrorResponse;
1214
use uuid::Uuid;
1315

@@ -419,4 +421,12 @@ pub(crate) trait Storage {
419421
pipeline_name: &str,
420422
how_many: u64,
421423
) -> Result<(ExtendedPipelineDescrMonitoring, Vec<SupportBundleData>), DBError>;
424+
425+
// NOTE: this is only used for testing currently
426+
async fn get_pipeline_lifecycle_events(
427+
&self,
428+
tenant_id: TenantId,
429+
pipeline_id: PipelineId,
430+
how_many: i64,
431+
) -> Result<Vec<(PipelineStatus, Option<String>, NaiveDateTime)>, DBError>;
422432
}

crates/pipeline-manager/src/db/storage_postgres.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::db::types::tenant::TenantId;
1919
use crate::db::types::version::Version;
2020
use crate::{auth::TenantRecord, config::DatabaseConfig};
2121
use async_trait::async_trait;
22+
use chrono::NaiveDateTime;
2223
use deadpool_postgres::{Manager, Pool, RecyclingMethod};
2324
use feldera_types::config::{PipelineConfig, RuntimeConfig};
2425
use feldera_types::error::ErrorResponse;
@@ -1112,6 +1113,25 @@ impl Storage for StoragePostgres {
11121113
txn.commit().await?;
11131114
Ok((pipeline, bundle_data))
11141115
}
1116+
1117+
async fn get_pipeline_lifecycle_events(
1118+
&self,
1119+
tenant_id: TenantId,
1120+
pipeline_id: PipelineId,
1121+
how_many: i64,
1122+
) -> Result<Vec<(PipelineStatus, Option<String>, NaiveDateTime)>, DBError> {
1123+
let mut client = self.pool.get().await?;
1124+
let txn = client.transaction().await?;
1125+
let events = operations::pipeline::get_pipeline_lifecycle_events(
1126+
&txn,
1127+
tenant_id,
1128+
pipeline_id,
1129+
how_many,
1130+
)
1131+
.await?;
1132+
txn.commit().await?;
1133+
Ok(events)
1134+
}
11151135
}
11161136

11171137
impl StoragePostgres {
@@ -1308,4 +1328,34 @@ impl StoragePostgres {
13081328
})
13091329
}
13101330
}
1331+
1332+
/// Deletes records from pipeline_lifecycle_events older than the given retention period.
1333+
///
1334+
/// # Arguments
1335+
/// * `retention_period` - A string that specifies the interval for data retention. This should be a format compatible with PostgreSQL INTERVAL types, such as:
1336+
/// - "7 days"
1337+
/// - "1 month"
1338+
/// - "3 hours"
1339+
/// - "2 weeks 3 days"
1340+
/// - "6 months 5 days 12 hours"
1341+
/// Any valid PostgreSQL interval string is acceptable (e.g., "1 day", "30 minutes").
1342+
/// See: https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-INTERVAL-INPUT
1343+
///
1344+
/// # Example
1345+
/// cleanup_pipeline_lifecycle_events("30 days").await?;
1346+
///
1347+
/// Records with `recorded_at` timestamp older than (now - retention_period) will be deleted.
1348+
/// Returns the number of records deleted, or an error if the operation fails.
1349+
pub async fn cleanup_pipeline_lifecycle_events(
1350+
&self,
1351+
retention_period: &str,
1352+
) -> Result<u64, DBError> {
1353+
let statement = format!("DELETE FROM pipeline_lifecycle_events WHERE recorded_at < NOW() - INTERVAL '{retention_period}'");
1354+
1355+
let mut client = self.pool.get().await?;
1356+
let txn = client.transaction().await?;
1357+
let deleted = txn.execute(&statement, &[]).await?;
1358+
txn.commit().await?;
1359+
Ok(deleted)
1360+
}
13111361
}

0 commit comments

Comments
 (0)