Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/pipeline-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,25 @@ pub struct DatabaseConfig {
#[serde(default)]
#[arg(long, env = "FELDERA_DB_TLS_DISABLE_HOSTNAME_VERIFY")]
pub disable_tls_hostname_verify: bool,

/// Disable pipeline events from getting collected.
/// This does not affect their cleanup.
#[arg(long, env = "FELDERA_DB_DISABLE_PIPELINE_EVENTS_COLLECTION")]
pub disable_pipeline_events_collection: bool,
}

impl DatabaseConfig {
pub fn new(db_connection_string: String, db_tls_certificate_path: Option<String>) -> Self {
pub fn new(
db_connection_string: String,
db_tls_certificate_path: Option<String>,
disable_pipeline_events_collection: bool,
) -> Self {
Self {
db_connection_string,
db_tls_certificate_path,
disable_tls_verify: false,
disable_tls_hostname_verify: false,
disable_pipeline_events_collection,
}
}

Expand Down
95 changes: 87 additions & 8 deletions crates/pipeline-manager/src/db/operations/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub(crate) async fn new_pipeline(
new_id: Uuid,
platform_version: &str,
pipeline: PipelineDescr,
disable_pipeline_events_collection: bool,
) -> Result<(PipelineId, Version), DBError> {
validate_name(&pipeline.name)?;
// Validate runtime configuration JSON when deserializing it
Expand Down Expand Up @@ -264,7 +265,14 @@ pub(crate) async fn new_pipeline(
.await
.map_err(maybe_unique_violation)
.map_err(|e| maybe_tenant_id_foreign_key_constraint_err(e, tenant_id))?;
new_pipeline_monitor_event(txn, tenant_id, PipelineId(new_id), Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
PipelineId(new_id),
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok((PipelineId(new_id), Version(1)))
}

Expand Down Expand Up @@ -295,6 +303,7 @@ pub(crate) async fn update_pipeline(
udf_rust: &Option<String>,
udf_toml: &Option<String>,
program_config: &Option<serde_json::Value>,
disable_pipeline_events_collection: bool,
) -> Result<Version, DBError> {
if let Some(name) = name {
validate_name(name)?;
Expand Down Expand Up @@ -543,7 +552,14 @@ pub(crate) async fn update_pipeline(
assert_eq!(rows_affected, 1); // The row must exist as it has been retrieved before
}

new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
current.id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(Version(current.version.0 + 1))
}

Expand Down Expand Up @@ -592,6 +608,7 @@ pub(crate) async fn set_program_status(
new_program_binary_source_checksum: &Option<String>,
new_program_binary_integrity_checksum: &Option<String>,
new_program_info_integrity_checksum: &Option<String>,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;

Expand Down Expand Up @@ -832,7 +849,14 @@ pub(crate) async fn set_program_status(
)
.await?;
if rows_affected > 0 {
new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
pipeline_id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(())
} else {
Err(DBError::UnknownPipeline { pipeline_id })
Expand All @@ -853,6 +877,7 @@ pub(crate) async fn dismiss_deployment_error(
txn: &Transaction<'_>,
tenant_id: TenantId,
pipeline_name: &str,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
let current = get_pipeline(txn, tenant_id, pipeline_name).await?;

Expand All @@ -873,7 +898,14 @@ pub(crate) async fn dismiss_deployment_error(
.await?;
let modified_rows = txn.execute(&stmt, &[&tenant_id.0, &current.id.0]).await?;
if modified_rows > 0 {
new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
current.id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(())
} else {
Err(DBError::UnknownPipeline {
Expand All @@ -883,6 +915,7 @@ pub(crate) async fn dismiss_deployment_error(
}

/// Sets pipeline desired resources status.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn set_deployment_resources_desired_status(
txn: &Transaction<'_>,
tenant_id: TenantId,
Expand All @@ -891,6 +924,7 @@ pub(crate) async fn set_deployment_resources_desired_status(
initial_runtime_desired_status: Option<RuntimeDesiredStatus>,
bootstrap_policy: Option<BootstrapPolicy>,
dismiss_error: bool,
disable_pipeline_events_collection: bool,
) -> Result<PipelineId, DBError> {
let current = get_pipeline(txn, tenant_id, pipeline_name).await?;

Expand Down Expand Up @@ -1014,7 +1048,14 @@ pub(crate) async fn set_deployment_resources_desired_status(
)
.await?;
if modified_rows > 0 {
new_pipeline_monitor_event(txn, tenant_id, current.id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
current.id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(current.id)
} else {
Err(DBError::UnknownPipeline {
Expand Down Expand Up @@ -1090,6 +1131,7 @@ pub(crate) async fn set_deployment_resources_status_stopped(
tenant_id: TenantId,
pipeline_id: PipelineId,
version_guard: Version,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1118,6 +1160,7 @@ pub(crate) async fn set_deployment_resources_status_stopped(
None,
None,
None,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1129,6 +1172,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning(
version_guard: Version,
new_deployment_id: Uuid,
new_deployment_config: serde_json::Value,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1157,6 +1201,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning(
None,
current.deployment_initial,
current.bootstrap_policy,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1167,6 +1212,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioning(
pipeline_id: PipelineId,
version_guard: Version,
new_deployment_resources_status_details: serde_json::Value,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand All @@ -1187,6 +1233,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioning(
None,
None,
None,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1202,6 +1249,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned(
new_deployment_runtime_status: RuntimeStatus,
new_deployment_runtime_status_details: serde_json::Value,
new_deployment_runtime_desired_status: RuntimeDesiredStatus,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1230,6 +1278,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned(
None,
current.deployment_initial,
current.bootstrap_policy,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1245,6 +1294,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioned(
new_deployment_runtime_status_details: serde_json::Value,
new_deployment_runtime_desired_status: RuntimeDesiredStatus,
new_storage_status_details: Option<serde_json::Value>,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand All @@ -1265,6 +1315,7 @@ pub(crate) async fn remain_deployment_resources_status_provisioned(
Some(new_deployment_runtime_status_details),
Some(new_deployment_runtime_desired_status),
new_storage_status_details,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1276,6 +1327,7 @@ pub(crate) async fn set_deployment_resources_status_stopping(
version_guard: Version,
new_deployment_error: Option<ErrorResponse>,
new_storage_status_details: Option<serde_json::Value>,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1304,6 +1356,7 @@ pub(crate) async fn set_deployment_resources_status_stopping(
new_storage_status_details,
None,
None,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1314,6 +1367,7 @@ pub(crate) async fn remain_deployment_resources_status_stopping(
pipeline_id: PipelineId,
version_guard: Version,
new_deployment_resources_status_details: serde_json::Value,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
check_version_guard_and_transition_deployment_resources_status(
txn,
Expand All @@ -1334,6 +1388,7 @@ pub(crate) async fn remain_deployment_resources_status_stopping(
None,
None,
None,
disable_pipeline_events_collection,
)
.await
}
Expand All @@ -1360,6 +1415,7 @@ async fn set_deployment_resources_status(
new_storage_status_details: Option<serde_json::Value>,
final_deployment_initial: Option<RuntimeDesiredStatus>,
final_bootstrap_policy: Option<BootstrapPolicy>,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
// Validate that the new or existing deployment configuration is valid
if let Some(deployment_config) = &final_deployment_config {
Expand Down Expand Up @@ -1429,7 +1485,14 @@ async fn set_deployment_resources_status(
)
.await?;
if rows_affected > 0 {
new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
pipeline_id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(())
} else {
Err(DBError::UnknownPipeline { pipeline_id })
Expand All @@ -1451,6 +1514,7 @@ async fn remain_deployment_resources_status(
new_deployment_runtime_status_details: Option<serde_json::Value>,
new_deployment_runtime_desired_status: Option<RuntimeDesiredStatus>,
new_storage_status_details: Option<serde_json::Value>,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
// Validate that the new storage status details is valid
if let Some(storage_status_details) = &new_storage_status_details {
Expand Down Expand Up @@ -1495,7 +1559,14 @@ async fn remain_deployment_resources_status(
)
.await?;
if rows_affected > 0 {
new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
pipeline_id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(())
} else {
Err(DBError::UnknownPipeline { pipeline_id })
Expand All @@ -1508,6 +1579,7 @@ pub(crate) async fn set_storage_status(
tenant_id: TenantId,
pipeline_id: PipelineId,
new_storage_status: StorageStatus,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;

Expand Down Expand Up @@ -1549,7 +1621,14 @@ pub(crate) async fn set_storage_status(
)
.await?;
if rows_affected > 0 {
new_pipeline_monitor_event(txn, tenant_id, pipeline_id, Uuid::now_v7()).await?;
new_pipeline_monitor_event(
txn,
tenant_id,
pipeline_id,
Uuid::now_v7(),
disable_pipeline_events_collection,
)
.await?;
Ok(())
} else {
Err(DBError::UnknownPipeline { pipeline_id })
Expand Down
5 changes: 5 additions & 0 deletions crates/pipeline-manager/src/db/operations/pipeline_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ pub(crate) async fn new_pipeline_monitor_event(
tenant_id: TenantId,
pipeline_id: PipelineId,
new_event_id: Uuid,
disable_pipeline_events_collection: bool,
) -> Result<(), DBError> {
if disable_pipeline_events_collection {
// No new monitor event is created if collection is disabled.
return Ok(());
}
let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id).await?;

let stmt = txn
Expand Down
Loading
Loading