From dce6a641aa368f9e5ba541c46193de25085b2a6f Mon Sep 17 00:00:00 2001 From: Simon Kassing Date: Wed, 6 May 2026 14:20:32 +0200 Subject: [PATCH] pipeline-manager: option to disable pipeline events collection Ability to disable pipeline event collection via a new option: - CLI argument: `--disable-pipeline-events-collection` - Counterpart environment variable: `FELDERA_DB_DISABLE_PIPELINE_EVENTS_COLLECTION` Signed-off-by: Simon Kassing --- crates/pipeline-manager/src/config.rs | 12 ++- .../src/db/operations/pipeline.rs | 95 +++++++++++++++++-- .../src/db/operations/pipeline_monitor.rs | 5 + .../src/db/storage_postgres.rs | 44 ++++++++- 4 files changed, 145 insertions(+), 11 deletions(-) diff --git a/crates/pipeline-manager/src/config.rs b/crates/pipeline-manager/src/config.rs index c2615beb197..cd31ad3251c 100644 --- a/crates/pipeline-manager/src/config.rs +++ b/crates/pipeline-manager/src/config.rs @@ -641,15 +641,25 @@ pub struct DatabaseConfig { #[serde(default)] #[arg(long, env = "FELDERA_DB_TLS_DISABLE_HOSTNAME_VERIFY")] pub disable_tls_hostname_verify: bool, + + /// Disable pipeline events from getting collected. + /// This does not affect their cleanup. + #[arg(long, env = "FELDERA_DB_DISABLE_PIPELINE_EVENTS_COLLECTION")] + pub disable_pipeline_events_collection: bool, } impl DatabaseConfig { - pub fn new(db_connection_string: String, db_tls_certificate_path: Option) -> Self { + pub fn new( + db_connection_string: String, + db_tls_certificate_path: Option, + disable_pipeline_events_collection: bool, + ) -> Self { Self { db_connection_string, db_tls_certificate_path, disable_tls_verify: false, disable_tls_hostname_verify: false, + disable_pipeline_events_collection, } } diff --git a/crates/pipeline-manager/src/db/operations/pipeline.rs b/crates/pipeline-manager/src/db/operations/pipeline.rs index 38f0c9fc301..80557e21e1b 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline.rs @@ -175,6 +175,7 @@ pub(crate) async fn new_pipeline( new_id: Uuid, platform_version: &str, pipeline: PipelineDescr, + disable_pipeline_events_collection: bool, ) -> Result<(PipelineId, Version), DBError> { validate_name(&pipeline.name)?; // Validate runtime configuration JSON when deserializing it @@ -264,7 +265,14 @@ pub(crate) async fn new_pipeline( .await .map_err(maybe_unique_violation) .map_err(|e| maybe_tenant_id_foreign_key_constraint_err(e, tenant_id))?; - new_pipeline_monitor_event(txn, tenant_id, PipelineId(new_id), Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + PipelineId(new_id), + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok((PipelineId(new_id), Version(1))) } @@ -295,6 +303,7 @@ pub(crate) async fn update_pipeline( udf_rust: &Option, udf_toml: &Option, program_config: &Option, + disable_pipeline_events_collection: bool, ) -> Result { if let Some(name) = name { validate_name(name)?; @@ -543,7 +552,14 @@ pub(crate) async fn update_pipeline( assert_eq!(rows_affected, 1); // The row must exist as it has been retrieved before } - new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + current.id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(Version(current.version.0 + 1)) } @@ -592,6 +608,7 @@ pub(crate) async fn set_program_status( new_program_binary_source_checksum: &Option, new_program_binary_integrity_checksum: &Option, new_program_info_integrity_checksum: &Option, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; @@ -832,7 +849,14 @@ pub(crate) async fn set_program_status( ) .await?; if rows_affected > 0 { - new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + pipeline_id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(()) } else { Err(DBError::UnknownPipeline { pipeline_id }) @@ -853,6 +877,7 @@ pub(crate) async fn dismiss_deployment_error( txn: &Transaction<'_>, tenant_id: TenantId, pipeline_name: &str, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { let current = get_pipeline(txn, tenant_id, pipeline_name).await?; @@ -873,7 +898,14 @@ pub(crate) async fn dismiss_deployment_error( .await?; let modified_rows = txn.execute(&stmt, &[&tenant_id.0, ¤t.id.0]).await?; if modified_rows > 0 { - new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + current.id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(()) } else { Err(DBError::UnknownPipeline { @@ -883,6 +915,7 @@ pub(crate) async fn dismiss_deployment_error( } /// Sets pipeline desired resources status. +#[allow(clippy::too_many_arguments)] pub(crate) async fn set_deployment_resources_desired_status( txn: &Transaction<'_>, tenant_id: TenantId, @@ -891,6 +924,7 @@ pub(crate) async fn set_deployment_resources_desired_status( initial_runtime_desired_status: Option, bootstrap_policy: Option, dismiss_error: bool, + disable_pipeline_events_collection: bool, ) -> Result { let current = get_pipeline(txn, tenant_id, pipeline_name).await?; @@ -1014,7 +1048,14 @@ pub(crate) async fn set_deployment_resources_desired_status( ) .await?; if modified_rows > 0 { - new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + current.id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(current.id) } else { Err(DBError::UnknownPipeline { @@ -1090,6 +1131,7 @@ pub(crate) async fn set_deployment_resources_status_stopped( tenant_id: TenantId, pipeline_id: PipelineId, version_guard: Version, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1118,6 +1160,7 @@ pub(crate) async fn set_deployment_resources_status_stopped( None, None, None, + disable_pipeline_events_collection, ) .await } @@ -1129,6 +1172,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning( version_guard: Version, new_deployment_id: Uuid, new_deployment_config: serde_json::Value, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1157,6 +1201,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning( None, current.deployment_initial, current.bootstrap_policy, + disable_pipeline_events_collection, ) .await } @@ -1167,6 +1212,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioning( pipeline_id: PipelineId, version_guard: Version, new_deployment_resources_status_details: serde_json::Value, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1187,6 +1233,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioning( None, None, None, + disable_pipeline_events_collection, ) .await } @@ -1202,6 +1249,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned( new_deployment_runtime_status: RuntimeStatus, new_deployment_runtime_status_details: serde_json::Value, new_deployment_runtime_desired_status: RuntimeDesiredStatus, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1230,6 +1278,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned( None, current.deployment_initial, current.bootstrap_policy, + disable_pipeline_events_collection, ) .await } @@ -1245,6 +1294,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioned( new_deployment_runtime_status_details: serde_json::Value, new_deployment_runtime_desired_status: RuntimeDesiredStatus, new_storage_status_details: Option, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1265,6 +1315,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioned( Some(new_deployment_runtime_status_details), Some(new_deployment_runtime_desired_status), new_storage_status_details, + disable_pipeline_events_collection, ) .await } @@ -1276,6 +1327,7 @@ pub(crate) async fn set_deployment_resources_status_stopping( version_guard: Version, new_deployment_error: Option, new_storage_status_details: Option, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1304,6 +1356,7 @@ pub(crate) async fn set_deployment_resources_status_stopping( new_storage_status_details, None, None, + disable_pipeline_events_collection, ) .await } @@ -1314,6 +1367,7 @@ pub(crate) async fn remain_deployment_resources_status_stopping( pipeline_id: PipelineId, version_guard: Version, new_deployment_resources_status_details: serde_json::Value, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { check_version_guard_and_transition_deployment_resources_status( txn, @@ -1334,6 +1388,7 @@ pub(crate) async fn remain_deployment_resources_status_stopping( None, None, None, + disable_pipeline_events_collection, ) .await } @@ -1360,6 +1415,7 @@ async fn set_deployment_resources_status( new_storage_status_details: Option, final_deployment_initial: Option, final_bootstrap_policy: Option, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { // Validate that the new or existing deployment configuration is valid if let Some(deployment_config) = &final_deployment_config { @@ -1429,7 +1485,14 @@ async fn set_deployment_resources_status( ) .await?; if rows_affected > 0 { - new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + pipeline_id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(()) } else { Err(DBError::UnknownPipeline { pipeline_id }) @@ -1451,6 +1514,7 @@ async fn remain_deployment_resources_status( new_deployment_runtime_status_details: Option, new_deployment_runtime_desired_status: Option, new_storage_status_details: Option, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { // Validate that the new storage status details is valid if let Some(storage_status_details) = &new_storage_status_details { @@ -1495,7 +1559,14 @@ async fn remain_deployment_resources_status( ) .await?; if rows_affected > 0 { - new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + pipeline_id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(()) } else { Err(DBError::UnknownPipeline { pipeline_id }) @@ -1508,6 +1579,7 @@ pub(crate) async fn set_storage_status( tenant_id: TenantId, pipeline_id: PipelineId, new_storage_status: StorageStatus, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; @@ -1549,7 +1621,14 @@ pub(crate) async fn set_storage_status( ) .await?; if rows_affected > 0 { - new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?; + new_pipeline_monitor_event( + txn, + tenant_id, + pipeline_id, + Uuid::now_v7(), + disable_pipeline_events_collection, + ) + .await?; Ok(()) } else { Err(DBError::UnknownPipeline { pipeline_id }) diff --git a/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs b/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs index e5454a2e690..5f746fba9ee 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs @@ -184,7 +184,12 @@ pub(crate) async fn new_pipeline_monitor_event( tenant_id: TenantId, pipeline_id: PipelineId, new_event_id: Uuid, + disable_pipeline_events_collection: bool, ) -> Result<(), DBError> { + if disable_pipeline_events_collection { + // No new monitor event is created if collection is disabled. + return Ok(()); + } let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id).await?; let stmt = txn diff --git a/crates/pipeline-manager/src/db/storage_postgres.rs b/crates/pipeline-manager/src/db/storage_postgres.rs index 35296cc1ae9..63869747f23 100644 --- a/crates/pipeline-manager/src/db/storage_postgres.rs +++ b/crates/pipeline-manager/src/db/storage_postgres.rs @@ -293,6 +293,7 @@ impl Storage for StoragePostgres { new_id, platform_version, pipeline.clone(), + self.db_config.disable_pipeline_events_collection, ) .await?; @@ -335,6 +336,7 @@ impl Storage for StoragePostgres { &Some(pipeline.udf_rust.clone()), &Some(pipeline.udf_toml.clone()), &Some(pipeline.program_config.clone()), + self.db_config.disable_pipeline_events_collection, ) .await?; false @@ -350,6 +352,7 @@ impl Storage for StoragePostgres { new_id, platform_version, pipeline.clone(), + self.db_config.disable_pipeline_events_collection, ) .await?; true @@ -447,6 +450,7 @@ impl Storage for StoragePostgres { udf_rust, udf_toml, program_config, + self.db_config.disable_pipeline_events_collection, ) .await?; @@ -493,6 +497,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -520,6 +525,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -549,6 +555,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -576,6 +583,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -607,6 +615,7 @@ impl Storage for StoragePostgres { &Some(program_binary_source_checksum.to_string()), &Some(program_binary_integrity_checksum.to_string()), &Some(program_info_integrity_checksum.to_string()), + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -635,6 +644,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -663,6 +673,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -691,6 +702,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -704,7 +716,13 @@ impl Storage for StoragePostgres { ) -> Result<(), DBError> { let mut client = self.pool.get().await?; let txn = client.transaction().await?; - operations::pipeline::dismiss_deployment_error(&txn, tenant_id, pipeline_name).await?; + operations::pipeline::dismiss_deployment_error( + &txn, + tenant_id, + pipeline_name, + self.db_config.disable_pipeline_events_collection, + ) + .await?; txn.commit().await?; Ok(()) } @@ -727,6 +745,7 @@ impl Storage for StoragePostgres { Some(initial), Some(bootstrap_policy), dismiss_error, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -748,6 +767,7 @@ impl Storage for StoragePostgres { None, None, false, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -773,6 +793,7 @@ impl Storage for StoragePostgres { None, None, false, + self.db_config.disable_pipeline_events_collection, ) .await?; true @@ -812,6 +833,7 @@ impl Storage for StoragePostgres { tenant_id, pipeline_id, StorageStatus::InUse, + self.db_config.disable_pipeline_events_collection, ) .await?; operations::pipeline::set_deployment_resources_status_provisioning( @@ -821,6 +843,7 @@ impl Storage for StoragePostgres { version_guard, deployment_id, deployment_config, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -842,6 +865,7 @@ impl Storage for StoragePostgres { pipeline_id, version_guard, deployment_resources_status_details, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -871,6 +895,7 @@ impl Storage for StoragePostgres { deployment_runtime_status, deployment_runtime_status_details, deployment_runtime_desired_status, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -900,6 +925,7 @@ impl Storage for StoragePostgres { deployment_runtime_status_details, deployment_runtime_desired_status, storage_status_details, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -938,6 +964,7 @@ impl Storage for StoragePostgres { None, None, false, + self.db_config.disable_pipeline_events_collection, ) .await?; operations::pipeline::set_deployment_resources_status_stopping( @@ -947,6 +974,7 @@ impl Storage for StoragePostgres { version_guard, deployment_error, storage_status_details, + self.db_config.disable_pipeline_events_collection, ) .await?; @@ -969,6 +997,7 @@ impl Storage for StoragePostgres { pipeline_id, version_guard, deployment_resources_status_details, + self.db_config.disable_pipeline_events_collection, ) .await?; @@ -989,6 +1018,7 @@ impl Storage for StoragePostgres { tenant_id, pipeline_id, version_guard, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -1012,6 +1042,7 @@ impl Storage for StoragePostgres { tenant_id, pipeline.id, StorageStatus::Clearing, + self.db_config.disable_pipeline_events_collection, ) .await?; } @@ -1031,6 +1062,7 @@ impl Storage for StoragePostgres { tenant_id, pipeline_id, StorageStatus::Cleared, + self.db_config.disable_pipeline_events_collection, ) .await?; txn.commit().await?; @@ -1104,6 +1136,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; } @@ -1124,6 +1157,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; } @@ -1193,6 +1227,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; } @@ -1213,6 +1248,7 @@ impl Storage for StoragePostgres { &None, &None, &None, + self.db_config.disable_pipeline_events_collection, ) .await?; } @@ -1498,7 +1534,11 @@ impl StoragePostgres { } let connection_string = pg_inst.settings().url(db_name); - let db_config = DatabaseConfig::new(connection_string, None); + let db_config = DatabaseConfig::new( + connection_string, + None, + db_config.disable_pipeline_events_collection, + ); return Self::initialize(&db_config, Some(pg_inst)).await; };