Skip to content

Commit b48b578

Browse files
committed
pipeline-manager: track pipeline lifecycle history
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent 09f2e45 commit b48b578

File tree

17 files changed

+862
-8
lines changed

17 files changed

+862
-8
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE IF NOT EXISTS pipeline_lifecycle_events (
2+
event_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
3+
pipeline_id uuid NOT NULL,
4+
tenant_id uuid NOT NULL,
5+
deployment_status varchar NOT NULL,
6+
info text,
7+
recorded_at TIMESTAMP NOT NULL DEFAULT now(),
8+
FOREIGN KEY (pipeline_id) REFERENCES pipeline(id) ON DELETE CASCADE,
9+
FOREIGN KEY (tenant_id) REFERENCES tenant(id) ON DELETE CASCADE
10+
);

crates/pipeline-manager/proptest-regressions/db/test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,4 @@ cc 2e05366358e733acf779daba9549e22bbb573636b4ff01ae589e8040284cdb4a # shrinks to
4343
cc 5902eed1de087fac3012980d700718483060cab6ad99c79204c11cd40753dc6c # shrinks to [NewOrUpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "pipeline-0", PipelineDescr { name: "pipeline-0", description: "", runtime_config: RuntimeConfig { workers: 0, storage: false, cpu_profiler: false, tracing: false, tracing_endpoint_jaeger: "", min_batch_size_records: 0, max_buffering_delay_usecs: 0, resources: ResourceConfig { cpu_cores_min: None, cpu_cores_max: None, memory_mb_min: None, memory_mb_max: None, storage_mb_max: None, storage_class: None }, min_storage_bytes: None }, program_code: "", program_config: ProgramConfig { profile: Some(Optimized) } }), NewOrUpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "pipeline-0", PipelineDescr { name: "pipeline-0", description: "", runtime_config: RuntimeConfig { workers: 0, storage: false, cpu_profiler: false, tracing: false, tracing_endpoint_jaeger: "", min_batch_size_records: 0, max_buffering_delay_usecs: 0, resources: ResourceConfig { cpu_cores_min: None, cpu_cores_max: None, memory_mb_min: None, memory_mb_max: None, storage_mb_max: None, storage_class: None }, min_storage_bytes: None }, program_code: "a", program_config: ProgramConfig { profile: Some(Optimized) } }), TransitProgramStatusToCompilingSql(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(01000000-0000-0000-0000-000000000000), Version(2)), TransitProgramStatusToCompilingRust(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(01000000-0000-0000-0000-000000000000), Version(2), ProgramInfo { schema: ProgramSchema { inputs: [], outputs: [] }, input_connectors: {}, output_connectors: {} }), TransitProgramStatusToPending(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(01000000-0000-0000-0000-000000000000), Version(2)), ListPipelines(TenantId(03000000-0000-0000-0000-000000000000))]
4444
cc c0ddd7741bf1667b3d70ea81ca4c298875235277256ee70f1fe2b6a9a50303ba # shrinks to [NewPipeline(TenantId(03000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, PipelineDescr { name: "pipeline-0", description: "", runtime_config: RuntimeConfig { workers: 0, storage: false, cpu_profiler: false, tracing: false, tracing_endpoint_jaeger: "", min_batch_size_records: 0, max_buffering_delay_usecs: 0, resources: ResourceConfig { cpu_cores_min: None, cpu_cores_max: None, memory_mb_min: None, memory_mb_max: None, storage_mb_max: None, storage_class: None }, min_storage_bytes: None }, program_code: "", program_config: ProgramConfig { profile: Some(Optimized) } }), TransitProgramStatusToCompilingSql(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToCompilingRust(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), ProgramInfo { schema: ProgramSchema { inputs: [], outputs: [] }, input_connectors: {}, output_connectors: {} }), TransitProgramStatusToSuccess(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), ""), TransitDeploymentStatusToFailed(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), ErrorResponse { message: "This is an example error response", error_code: "SomeExampleError", details: Object {"extra-info": Number(0)} }), TransitProgramStatusToCompilingSql(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1))]
4545
cc 126dfeb51d3f8f4707aaf4d12a3d6f90a0ac9fe7f12cffcaa014e8604dfb5f3c # shrinks to [NewPipeline(TenantId(01000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-4", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "dev_tweaks": Object {}}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false)} }), TransitProgramStatusToCompilingSql(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSqlCompiled(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), SqlCompilationInfo { exit_code: 0, messages: [] }, Object {"schema": Object {"inputs": Array [], "outputs": Array []}, "main_rust": String("main-rust-0"), "udf_stubs": String("udf-stubs-0"), "dataflow": Null, "input_connectors": Object {}, "output_connectors": Object {}}), TransitProgramStatusToCompilingRust(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSuccess(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), RustCompilationInfo { exit_code: 0, stdout: "stdout-0", stderr: "stderr-0" }, "source_checksum_0", "integrity_checksum_0", ""), NewOrUpdatePipeline(TenantId(01000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "pipeline-4", "v0", PipelineDescr { name: "pipeline-3", description: "\u{11cb6}^ $gn𑍐dÊ'kڦ=~\u{1cf42}>𖭯ℶ🛟q", runtime_config: Object {"workers": Number(9614), "storage": Object {"backend": Object {"name": String("default")}, "min_storage_bytes": Null, "min_step_storage_bytes": Null, "compression": String("default"), "cache_mib": Null}, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(true), "tracing": Bool(false), "tracing_endpoint_jaeger": String("\\&🕴"), "min_batch_size_records": Number(9821004243620302662), "max_buffering_delay_usecs": Number(409483829293609342), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Number(2119326179280987330), "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Number(7151796549280257912), "storage_class": Null}, "clock_resolution_usecs": Number(9929288629303979490), "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(true), "dev_tweaks": Object {}}, program_code: "ெ:ඍ𑎄2𝒪]₄&&𑶨<𑃠/🏿ꬵஓP", udf_rust: "=ঊോ🈕স?T𐀇ὙὛ*lૐ>X\u{1e00b}(ȺਠE🕴=Ѩ﷏;1", udf_toml: ".5oy�%ᏺj𖮎𐨓,", program_config: Object {"profile": Null, "cache": Bool(true)} }), TransitProgramStatusToSystemError(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(2), "יּ?!\u{113c7}<%\u{1e023},&ꠑÐ𐑠"), GetPipelineById(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000))]
46+
cc 02de73819c5ea51c45bc2ce97f22c7aab777e3c55cdcc3c0deeaa87e396d5334 # shrinks to [NewPipeline(TenantId(01000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-3", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), TransitDeploymentStatusToStopping(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(01000000-0000-0000-0000-000000000000), Version(1), None, None), TransitDeploymentStatusToStopped(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(01000000-0000-0000-0000-000000000000), Version(1)), UpdatePipeline(TenantId(01000000-0000-0000-0000-000000000000), "", None, None, "v0", Some(Object {"workers": String("abc")}), None, Some("/¥\u{16ff0}🂿ࡠF𐕶//\\ꬎ?:E¥)6🈐﹛Ⱥ𑤉="), None, None), NewPipeline(TenantId(01000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-3", description: "{i", runtime_config: Object {"workers": Number(30332), "storage": Object {"backend": Object {"name": String("default")}, "min_storage_bytes": Null, "min_step_storage_bytes": Null, "compression": String("default"), "cache_mib": Null}, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(true), "tracing": Bool(false), "tracing_endpoint_jaeger": String("𑚗ἰѨ&࠵dym,𐑳Y𛲀01ﷇ!୫ᤵⶩ\\𛱶𑍌/ﶟ𐡍"), "min_batch_size_records": Number(15839775634250258066), "max_buffering_delay_usecs": Number(3347010042285470709), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Number(9.39201457952098e162), "memory_mb_min": Number(1246125802016676048), "memory_mb_max": Number(14803455586453162047), "storage_mb_max": Null, "storage_class": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(true), "http_workers": Null, "io_workers": Number(4522380348662173641), "dev_tweaks": Object {}, "logging": Null}, program_code: "ᏠѨ`z8ਘ\"=𐕲/?ⷙ£៖Ⱥ¸VMU", udf_rust: "P]#>\u{1732}\u{11340}m&꧗\"$𐎲ঌ|𑼴%𑜿?`g=🇸୦꩑ணmp", udf_toml: "𐅙>🕴$𖭮𝒗?c𐠼.e$*𞟠\\Ⱥ𞸧'/mz¥ȺѨᦦ.എ\u{1e08f}?𑓙", program_config: Object {"profile": String("unoptimized"), "cache": Bool(false), "runtime_version": Null} }), GetPipelineLifecycleEvents(TenantId(01000000-0000-0000-0000-000000000000), "pipeline-3", 529798559)]

crates/pipeline-manager/src/api.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod demo;
2020
pub mod endpoints;
2121
pub mod error;
2222
mod examples;
23+
pub mod lifecycle_events;
2324
pub mod main;
2425
pub mod support_data_collector;
2526
pub mod util;

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use feldera_types::query_params::MetricsParameters;
2020
use log::{debug, info};
2121
use std::time::Duration;
2222

23+
pub mod lifecycle_events;
2324
pub mod support_bundle;
2425

2526
/// Push data to a SQL table.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use actix_web::{
2+
get,
3+
web::{self, Data as WebData, ReqData},
4+
HttpResponse,
5+
};
6+
7+
use crate::{
8+
api::{examples, main::ServerState},
9+
db::{storage::Storage, types::tenant::TenantId},
10+
error::ManagerError,
11+
};
12+
13+
/// Query parameters for lifecycle events endpoint.
14+
#[derive(serde::Deserialize)]
15+
pub struct EventsParameters {
16+
pub max_events: u32,
17+
}
18+
19+
/// Get lifecycle events for a pipeline.
20+
///
21+
/// Returns a list of lifecycle events for the specified pipeline.
22+
/// The number of events returned is controlled by the `max_events` query parameter.
23+
///
24+
/// # Parameters
25+
/// - `pipeline_name`: Unique pipeline name (path parameter)
26+
/// - `max_events`: Maximum number of events to return (query parameter, required)
27+
///
28+
/// # Returns
29+
/// - 200 OK: List of lifecycle events for the pipeline
30+
/// - 404 NOT_FOUND: Pipeline with that name does not exist
31+
/// - 503 SERVICE_UNAVAILABLE: Disconnected or timeout during response
32+
/// - 500 INTERNAL_SERVER_ERROR: Internal error
33+
#[utoipa::path(
34+
context_path = "/v0",
35+
security(("JSON web token (JWT) or API key" = [])),
36+
params(
37+
("pipeline_name" = String, Path, description = "Unique pipeline name"),
38+
("max_events" = u32, Query, description = "Maximum number of events to return")
39+
),
40+
responses(
41+
(status = OK
42+
, description = "List of lifecycle events for the pipeline"
43+
, content_type = "application/json"
44+
, body = Vec<PipelineLifecycleEvent>),
45+
(status = NOT_FOUND
46+
, description = "Pipeline with that name does not exist"
47+
, body = ErrorResponse
48+
, example = json!(examples::error_unknown_pipeline_name())),
49+
(status = SERVICE_UNAVAILABLE
50+
, body = ErrorResponse
51+
, examples(
52+
("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))),
53+
("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout())))
54+
)
55+
),
56+
(status = INTERNAL_SERVER_ERROR, body = ErrorResponse),
57+
),
58+
tag = "Pipeline interaction"
59+
)]
60+
#[get("/pipelines/{pipeline_name}/lifecycle_events")]
61+
pub(crate) async fn get_pipeline_lifecycle_events(
62+
state: WebData<ServerState>,
63+
tenant_id: ReqData<TenantId>,
64+
path: web::Path<String>,
65+
query: web::Query<EventsParameters>,
66+
) -> Result<HttpResponse, ManagerError> {
67+
let pipeline_name = path.into_inner();
68+
let max_events = query.max_events;
69+
70+
let events = state
71+
.db
72+
.lock()
73+
.await
74+
.get_pipeline_lifecycle_events(*tenant_id, &pipeline_name, max_events)
75+
.await?;
76+
77+
Ok(HttpResponse::Ok()
78+
.content_type("application/json")
79+
.json(events))
80+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use chrono::NaiveDateTime;
2+
use serde::{Deserialize, Serialize};
3+
use utoipa::ToSchema;
4+
use uuid::Uuid;
5+
6+
use crate::db::types::pipeline::PipelineStatus;
7+
8+
// Pipeline Lifecycle Events
9+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
10+
pub struct PipelineLifecycleEvent {
11+
pub event_id: Uuid,
12+
// pub tenant_id: TenantId,
13+
// pub pipeline_id: PipelineId,
14+
pub deployment_status: PipelineStatus,
15+
pub info: Option<String>,
16+
pub recorded_at: NaiveDateTime,
17+
}

crates/pipeline-manager/src/api/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ It contains the following fields:
208208
endpoints::pipeline_interaction::commit_transaction,
209209
endpoints::pipeline_interaction::get_pipeline_time_series,
210210
endpoints::pipeline_interaction::get_pipeline_time_series_stream,
211+
endpoints::pipeline_interaction::lifecycle_events::get_pipeline_lifecycle_events,
211212
212213
// API keys
213214
endpoints::api_key::list_api_keys,
@@ -254,6 +255,8 @@ It contains the following fields:
254255
crate::api::endpoints::pipeline_management::PostPutPipeline,
255256
crate::api::endpoints::pipeline_management::PatchPipeline,
256257
crate::api::endpoints::pipeline_management::PostStopPipelineParameters,
258+
// Lifecycle Events
259+
crate::api::lifecycle_events::PipelineLifecycleEvent,
257260
258261
// Storage
259262
crate::db::types::storage::StorageStatus,
@@ -482,6 +485,7 @@ fn api_scope() -> Scope {
482485
.service(endpoints::pipeline_interaction::completion_status)
483486
.service(endpoints::pipeline_interaction::start_transaction)
484487
.service(endpoints::pipeline_interaction::commit_transaction)
488+
.service(endpoints::pipeline_interaction::lifecycle_events::get_pipeline_lifecycle_events)
485489
// API keys endpoints
486490
.service(endpoints::api_key::list_api_keys)
487491
.service(endpoints::api_key::get_api_key)

crates/pipeline-manager/src/auth.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ mod test {
726726
enable_https: false,
727727
https_tls_cert_path: None,
728728
https_tls_key_path: None,
729+
lifecycle_events_retention_days: 0,
730+
lifecycle_events_cleanup_frequency_secs: 1,
729731
};
730732

731733
let manager_config = ApiServerConfig {

crates/pipeline-manager/src/compiler/test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ impl CompilerTest {
4646
enable_https: false,
4747
https_tls_cert_path: None,
4848
https_tls_key_path: None,
49+
lifecycle_events_retention_days: 0,
50+
lifecycle_events_cleanup_frequency_secs: 1,
4951
};
5052
let compiler_config = CompilerConfig {
5153
sql_compiler_path:

crates/pipeline-manager/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,14 @@ pub struct CommonConfig {
241241
/// `/path/to/tls.key`).
242242
#[arg(long)]
243243
pub https_tls_key_path: Option<String>,
244+
245+
/// Retention period for pipeline lifecycle events (in days).
246+
#[arg(long, default_value_t = 7)]
247+
pub lifecycle_events_retention_days: u16,
248+
249+
/// Frequency at which pipeline lifecycle events are cleaned up (in seconds).
250+
#[arg(long, default_value_t = 60 * 60)]
251+
pub lifecycle_events_cleanup_frequency_secs: u64,
244252
}
245253

246254
impl CommonConfig {
@@ -388,6 +396,8 @@ impl CommonConfig {
388396
enable_https: false,
389397
https_tls_cert_path: None,
390398
https_tls_key_path: None,
399+
lifecycle_events_retention_days: 7,
400+
lifecycle_events_cleanup_frequency_secs: 60 * 60,
391401
}
392402
}
393403
}

0 commit comments

Comments
 (0)