diff --git a/crates/pipeline-manager/src/db/operations/pipeline.rs b/crates/pipeline-manager/src/db/operations/pipeline.rs index febecc99256..2e62800f36b 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline.rs @@ -82,13 +82,16 @@ pub(crate) async fn get_pipeline( txn: &Transaction<'_>, tenant_id: TenantId, name: &str, + row_lock: bool, ) -> Result { let stmt = txn .prepare_cached(&format!( "SELECT {PIPELINE_COLUMNS_ALL} FROM pipeline AS p WHERE p.tenant_id = $1 AND p.name = $2 - " + {} + ", + if row_lock { "FOR UPDATE" } else { "" } )) .await?; let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or( @@ -103,13 +106,16 @@ pub(crate) async fn get_pipeline_for_monitoring( txn: &Transaction<'_>, tenant_id: TenantId, name: &str, + row_lock: bool, ) -> Result { let stmt = txn .prepare_cached(&format!( "SELECT {PIPELINE_COLUMNS_MONITORING} FROM pipeline AS p WHERE p.tenant_id = $1 AND p.name = $2 - " + {} + ", + if row_lock { "FOR UPDATE" } else { "" } )) .await?; let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or( @@ -125,13 +131,16 @@ async fn internal_get_pipeline_by_id( tenant_id: TenantId, pipeline_id: PipelineId, fields: &'static str, + row_lock: bool, ) -> Result { let stmt = txn .prepare_cached(&format!( "SELECT {fields} FROM pipeline AS p WHERE p.tenant_id = $1 AND p.id = $2 - " + {} + ", + if row_lock { "FOR UPDATE" } else { "" } )) .await?; txn.query_opt(&stmt, &[&tenant_id.0, &pipeline_id.0]) @@ -143,9 +152,11 @@ pub async fn get_pipeline_by_id( txn: &Transaction<'_>, tenant_id: TenantId, pipeline_id: PipelineId, + row_lock: bool, ) -> Result { let row = - internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL).await?; + internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL, row_lock) + .await?; parse_pipeline_row_all(&row) } @@ -153,9 +164,16 @@ pub async fn get_pipeline_by_id_for_monitoring( txn: &Transaction<'_>, tenant_id: TenantId, pipeline_id: PipelineId, + row_lock: bool, ) -> Result { - let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_MONITORING) - .await?; + let row = internal_get_pipeline_by_id( + txn, + tenant_id, + pipeline_id, + PIPELINE_COLUMNS_MONITORING, + row_lock, + ) + .await?; parse_pipeline_row_monitoring(&row) } @@ -163,9 +181,16 @@ pub async fn get_pipeline_by_id_for_event_info( txn: &Transaction<'_>, tenant_id: TenantId, pipeline_id: PipelineId, + row_lock: bool, ) -> Result { - let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_EVENT_INFO) - .await?; + let row = internal_get_pipeline_by_id( + txn, + tenant_id, + pipeline_id, + PIPELINE_COLUMNS_EVENT_INFO, + row_lock, + ) + .await?; parse_pipeline_row_event_info(&row) } @@ -342,7 +367,7 @@ pub(crate) async fn update_pipeline( // Fetch current pipeline to decide how to update. // This will also return an error if the pipeline does not exist. - let current = get_pipeline(txn, tenant_id, original_name).await?; + let current = get_pipeline(txn, tenant_id, original_name, true).await?; // Pipeline update is allowed if either: // - Current status is `Stopped` AND desired status is `Stopped` @@ -570,7 +595,7 @@ pub(crate) async fn delete_pipeline( tenant_id: TenantId, name: &str, ) -> Result { - let current = get_pipeline(txn, tenant_id, name).await?; + let current = get_pipeline(txn, tenant_id, name, true).await?; // Pipeline deletion is only possible if it is fully stopped if current.deployment_resources_status != ResourcesStatus::Stopped @@ -611,7 +636,7 @@ pub(crate) async fn set_program_status( new_program_binary_integrity_checksum: &Option, new_program_info_integrity_checksum: &Option, ) -> Result<(), DBError> { - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; // Only if the program whose status is being transitioned is the same one can it be updated if current.program_version != program_version_guard { @@ -874,7 +899,7 @@ pub(crate) async fn dismiss_deployment_error( tenant_id: TenantId, pipeline_name: &str, ) -> Result<(), DBError> { - let current = get_pipeline(txn, tenant_id, pipeline_name).await?; + let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?; // If an error has to be cleared, then it is only possible if it is fully stopped if (current.deployment_resources_status != ResourcesStatus::Stopped @@ -912,7 +937,7 @@ pub(crate) async fn set_deployment_resources_desired_status( bootstrap_config: Option, dismiss_error: bool, ) -> Result { - let current = get_pipeline(txn, tenant_id, pipeline_name).await?; + let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?; // Deployment error will be automatically dismissed if this is wanted let final_deployment_error = if dismiss_error { @@ -1054,7 +1079,7 @@ async fn check_version_guard_and_transition_deployment_resources_status( new_deployment_resources_status: ResourcesStatus, remain: bool, ) -> Result { - let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id, true).await?; // Use the version guard to check that the deployment is the intended one if current.version != version_guard { @@ -1124,7 +1149,7 @@ pub(crate) async fn set_deployment_resources_status_stopped( false, ) .await?; - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; set_deployment_resources_status( txn, @@ -1200,7 +1225,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning( false, ) .await?; - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; set_deployment_resources_status( txn, @@ -1273,7 +1298,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned( false, ) .await?; - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; set_deployment_resources_status( txn, @@ -1347,7 +1372,7 @@ pub(crate) async fn set_deployment_resources_status_stopping( false, ) .await?; - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; set_deployment_resources_status( txn, @@ -1570,7 +1595,7 @@ pub(crate) async fn set_storage_status( pipeline_id: PipelineId, new_storage_status: StorageStatus, ) -> Result<(), DBError> { - let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; + let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; // Check that the transition is permitted validate_storage_status_transition( @@ -1686,6 +1711,96 @@ pub(crate) async fn list_pipelines_across_all_tenants_for_monitoring( Ok(result) } +/// Retrieves a list of pipelines across all tenants that are candidates to have SQL Rust +/// compilation cleared. +pub(crate) async fn list_pipelines_across_all_tenants_clear_sql_compilation( + txn: &Transaction<'_>, + platform_version: &str, + worker_id: usize, + total_workers: usize, +) -> Result, DBError> { + // See `get_next_sql_compilation` for the worker id matching function explanation. + let stmt = txn + .prepare_cached(&format!( + "SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING} + FROM pipeline AS p + WHERE p.deployment_resources_status = 'stopped' + AND ( + (p.platform_version = $1 AND p.program_status = 'compiling_sql') + OR + (p.platform_version != $1 AND (p.program_status = 'pending' OR p.program_status = 'compiling_sql')) + ) + AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3 + ORDER BY p.id ASC + FOR UPDATE + " + )) + .await?; + let rows: Vec = txn + .query( + &stmt, + &[ + &platform_version.to_string(), + &(total_workers as i64), + &(worker_id as i64), + ], + ) + .await?; + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + result.push(( + TenantId(row.get("tenant_id")), + parse_pipeline_row_monitoring(&row)?, + )); + } + Ok(result) +} + +/// Retrieves a list of pipelines across all tenants that are candidates to have their Rust +/// compilation cleared. +pub(crate) async fn list_pipelines_across_all_tenants_clear_rust_compilation( + txn: &Transaction<'_>, + platform_version: &str, + worker_id: usize, + total_workers: usize, +) -> Result, DBError> { + // See `get_next_sql_compilation` for the worker id matching function explanation. + let stmt = txn + .prepare_cached(&format!( + "SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING} + FROM pipeline AS p + WHERE p.deployment_resources_status = 'stopped' + AND ( + (p.platform_version = $1 AND p.program_status = 'compiling_rust') + OR + (p.platform_version != $1 AND (p.program_status = 'sql_compiled' OR p.program_status = 'compiling_rust')) + ) + AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3 + ORDER BY p.id ASC + FOR UPDATE + " + )) + .await?; + let rows: Vec = txn + .query( + &stmt, + &[ + &platform_version.to_string(), + &(total_workers as i64), + &(worker_id as i64), + ], + ) + .await?; + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + result.push(( + TenantId(row.get("tenant_id")), + parse_pipeline_row_monitoring(&row)?, + )); + } + Ok(result) +} + /// Retrieves the pipeline which is stopped, whose program status has been Pending /// for the longest, and is of the current platform version. Returns `None` if none is found. pub(crate) async fn get_next_sql_compilation( diff --git a/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs b/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs index e5454a2e690..deab05c97cf 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline_monitor.rs @@ -25,7 +25,7 @@ pub(crate) async fn get_pipeline_monitor_event_short( pipeline_name: String, event_id: PipelineMonitorEventId, ) -> Result { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -51,7 +51,7 @@ pub(crate) async fn get_pipeline_monitor_event_extended( pipeline_name: String, event_id: PipelineMonitorEventId, ) -> Result { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -76,7 +76,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_short( tenant_id: TenantId, pipeline_name: String, ) -> Result { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -103,7 +103,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_extended( tenant_id: TenantId, pipeline_name: String, ) -> Result { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -130,7 +130,7 @@ pub(crate) async fn list_pipeline_monitor_events_short( tenant_id: TenantId, pipeline_name: String, ) -> Result, DBError> { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -157,7 +157,7 @@ pub(crate) async fn list_pipeline_monitor_events_extended( tenant_id: TenantId, pipeline_name: String, ) -> Result, DBError> { - let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) + let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) .await? .id; @@ -185,7 +185,7 @@ pub(crate) async fn new_pipeline_monitor_event( pipeline_id: PipelineId, new_event_id: Uuid, ) -> Result<(), DBError> { - let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id).await?; + let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id, true).await?; let stmt = txn .prepare_cached( diff --git a/crates/pipeline-manager/src/db/storage_postgres.rs b/crates/pipeline-manager/src/db/storage_postgres.rs index f787c4ac942..9223ded6ce4 100644 --- a/crates/pipeline-manager/src/db/storage_postgres.rs +++ b/crates/pipeline-manager/src/db/storage_postgres.rs @@ -29,13 +29,14 @@ use deadpool_postgres::{Manager, Pool, RecyclingMethod}; use feldera_types::config::{PipelineConfig, RuntimeConfig}; use feldera_types::error::ErrorResponse; use feldera_types::runtime_status::{BootstrapConfig, RuntimeDesiredStatus, RuntimeStatus}; -use tokio_postgres::Row; +use tokio_postgres::{IsolationLevel, Row}; use tracing::{debug, info}; use uuid::Uuid; // Convert PipelineId UUID to u64 to match PostgreSQL's behavior. // This uses the first 8 bytes of the UUID converted to a big-endian u64. // This ensures consistent worker assignment between Rust and SQL implementations. +#[allow(dead_code)] // Might be useful in the future to have equivalent of SQL implementation fn pipline_uuid_to_u64(pipeline_id: PipelineId) -> u64 { let bytes = pipeline_id.0.as_bytes(); u64::from_be_bytes([ @@ -45,6 +46,7 @@ fn pipline_uuid_to_u64(pipeline_id: PipelineId) -> u64 { /// Determine if a pipeline is assigned to a specific worker based on its ID. /// Uses modulo operation to distribute pipelines across workers. +#[allow(dead_code)] // Might be useful in the future to have equivalent of SQL implementation pub(crate) fn is_pipeline_assigned_to_worker( pipeline_id: PipelineId, worker_index: u64, @@ -188,7 +190,7 @@ impl Storage for StoragePostgres { ) -> Result { let mut client = self.pool.get().await?; let txn = client.transaction().await?; - let pipeline = operations::pipeline::get_pipeline(&txn, tenant_id, name).await?; + let pipeline = operations::pipeline::get_pipeline(&txn, tenant_id, name, false).await?; txn.commit().await?; Ok(pipeline) } @@ -201,7 +203,7 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipeline = - operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, name).await?; + operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, name, false).await?; txn.commit().await?; Ok(pipeline) } @@ -214,7 +216,7 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipeline = - operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline_id).await?; + operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline_id, false).await?; txn.commit().await?; Ok(pipeline) } @@ -226,9 +228,13 @@ impl Storage for StoragePostgres { ) -> Result { let mut client = self.pool.get().await?; let txn = client.transaction().await?; - let pipeline = - operations::pipeline::get_pipeline_by_id_for_monitoring(&txn, tenant_id, pipeline_id) - .await?; + let pipeline = operations::pipeline::get_pipeline_by_id_for_monitoring( + &txn, + tenant_id, + pipeline_id, + false, + ) + .await?; txn.commit().await?; Ok(pipeline) } @@ -241,10 +247,19 @@ impl Storage for StoragePostgres { provision_called: bool, ) -> Result { let mut client = self.pool.get().await?; - let txn = client.transaction().await?; - let pipeline_monitoring = - operations::pipeline::get_pipeline_by_id_for_monitoring(&txn, tenant_id, pipeline_id) - .await?; + let txn = client + .build_transaction() + .isolation_level(IsolationLevel::RepeatableRead) + .read_only(true) + .start() + .await?; + let pipeline_monitoring = operations::pipeline::get_pipeline_by_id_for_monitoring( + &txn, + tenant_id, + pipeline_id, + false, + ) + .await?; let is_ready_compiled = pipeline_monitoring.program_status == ProgramStatus::Success && is_supported_runtime(platform_version, &pipeline_monitoring.platform_version); let pipeline_result = if matches!( @@ -267,7 +282,8 @@ impl Storage for StoragePostgres { ), ) { ExtendedPipelineDescrRunner::Complete( - operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline_id).await?, + operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline_id, false) + .await?, ) } else { ExtendedPipelineDescrRunner::Monitoring(pipeline_monitoring) @@ -298,7 +314,7 @@ impl Storage for StoragePostgres { // Fetch newly created pipeline let extended_pipeline = - operations::pipeline::get_pipeline(&txn, tenant_id, &pipeline.name).await?; + operations::pipeline::get_pipeline(&txn, tenant_id, &pipeline.name, false).await?; txn.commit().await?; Ok(extended_pipeline) @@ -317,7 +333,8 @@ impl Storage for StoragePostgres { let txn = client.transaction().await?; // Check if pipeline exists - let current = operations::pipeline::get_pipeline(&txn, tenant_id, original_name).await; + let current = + operations::pipeline::get_pipeline(&txn, tenant_id, original_name, true).await; let is_new: bool = match current { Ok(_) => { // Pipeline already exists, as such update it @@ -363,7 +380,7 @@ impl Storage for StoragePostgres { // Fetch new or updated pipeline let extended_pipeline = - operations::pipeline::get_pipeline(&txn, tenant_id, &pipeline.name).await?; + operations::pipeline::get_pipeline(&txn, tenant_id, &pipeline.name, false).await?; txn.commit().await?; Ok((is_new, extended_pipeline)) @@ -380,7 +397,7 @@ impl Storage for StoragePostgres { // Check if pipeline exists let current = - operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name) + operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name, true) .await?; if current.deployment_resources_status != ResourcesStatus::Stopped { @@ -391,12 +408,15 @@ impl Storage for StoragePostgres { .prepare_cached( "UPDATE pipeline SET platform_version = $1 - WHERE id = $2", + WHERE tenant_id = $2 AND id = $3", ) .await?; let rows_affected = txn - .execute(&stmt_update, &[&platform_version, ¤t.id.0]) + .execute( + &stmt_update, + &[&platform_version, &tenant_id.0, ¤t.id.0], + ) .await?; assert_eq!(rows_affected, 1); @@ -453,7 +473,7 @@ impl Storage for StoragePostgres { // Fetch updated pipeline let final_name = name.clone().unwrap_or(original_name.to_string()); let extended_pipeline = - operations::pipeline::get_pipeline(&txn, tenant_id, &final_name).await?; + operations::pipeline::get_pipeline(&txn, tenant_id, &final_name, false).await?; txn.commit().await?; Ok(extended_pipeline) @@ -762,7 +782,7 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipeline = - operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name) + operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name, true) .await?; let was_set = if pipeline.deployment_resources_status != ResourcesStatus::Provisioned { operations::pipeline::set_deployment_resources_desired_status( @@ -795,9 +815,13 @@ impl Storage for StoragePostgres { let txn = client.transaction().await?; // If the pipeline currently is already Stopped and is desired to be Stopped, // then there is no need to transition to Provisioning - let pipeline = - operations::pipeline::get_pipeline_by_id_for_monitoring(&txn, tenant_id, pipeline_id) - .await?; + let pipeline = operations::pipeline::get_pipeline_by_id_for_monitoring( + &txn, + tenant_id, + pipeline_id, + true, + ) + .await?; if pipeline.deployment_resources_status == ResourcesStatus::Stopped && pipeline.deployment_resources_desired_status == ResourcesDesiredStatus::Stopped { @@ -916,9 +940,13 @@ impl Storage for StoragePostgres { ) -> Result<(), DBError> { let mut client = self.pool.get().await?; let txn = client.transaction().await?; - let pipeline = - operations::pipeline::get_pipeline_by_id_for_monitoring(&txn, tenant_id, pipeline_id) - .await?; + let pipeline = operations::pipeline::get_pipeline_by_id_for_monitoring( + &txn, + tenant_id, + pipeline_id, + true, + ) + .await?; // If the pipeline currently is already Stopped and is desired to be Stopped, // then there is no need to transition to Stopping if pipeline.deployment_resources_status == ResourcesStatus::Stopped @@ -1004,9 +1032,13 @@ impl Storage for StoragePostgres { ) -> Result<(), DBError> { let mut client = self.pool.get().await?; let txn = client.transaction().await?; - let pipeline = - operations::pipeline::get_pipeline_by_id_for_monitoring(&txn, tenant_id, pipeline_id) - .await?; + let pipeline = operations::pipeline::get_pipeline_by_id_for_monitoring( + &txn, + tenant_id, + pipeline_id, + true, + ) + .await?; operations::pipeline::set_deployment_resources_desired_status( &txn, tenant_id, @@ -1037,7 +1069,7 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipeline = - operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name) + operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name, true) .await?; if pipeline.storage_status != StorageStatus::Cleared { // If it is already cleared, it does not have to transition to clearing @@ -1114,53 +1146,47 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipelines = - operations::pipeline::list_pipelines_across_all_tenants_for_monitoring(&txn).await?; + operations::pipeline::list_pipelines_across_all_tenants_clear_sql_compilation( + &txn, + platform_version, + worker_id, + total_workers, + ) + .await?; for (tenant_id, pipeline) in pipelines { - // skip the pipelines that are not assigned to this worker - if !is_pipeline_assigned_to_worker(pipeline.id, worker_id as u64, total_workers as u64) - { - continue; - } - - if pipeline.deployment_resources_status == ResourcesStatus::Stopped { - if pipeline.platform_version == platform_version { - if pipeline.program_status == ProgramStatus::CompilingSql { - operations::pipeline::set_program_status( - &txn, - tenant_id, - pipeline.id, - pipeline.program_version, - &ProgramStatus::Pending, - &None, - &None, - &None, - &None, - &None, - &None, - &None, - ) - .await?; - } - } else if pipeline.program_status == ProgramStatus::Pending - || pipeline.program_status == ProgramStatus::CompilingSql - { - operations::pipeline::update_pipeline( - &txn, - true, // Done by compiler - tenant_id, - &pipeline.name, - &None, - &None, - platform_version, - true, - &None, - &None, - &None, - &None, - &None, - ) - .await?; - } + if pipeline.platform_version == platform_version { + operations::pipeline::set_program_status( + &txn, + tenant_id, + pipeline.id, + pipeline.program_version, + &ProgramStatus::Pending, + &None, + &None, + &None, + &None, + &None, + &None, + &None, + ) + .await?; + } else { + operations::pipeline::update_pipeline( + &txn, + true, // Done by compiler + tenant_id, + &pipeline.name, + &None, + &None, + platform_version, + true, + &None, + &None, + &None, + &None, + &None, + ) + .await?; } } txn.commit().await?; @@ -1195,61 +1221,53 @@ impl Storage for StoragePostgres { let mut client = self.pool.get().await?; let txn = client.transaction().await?; let pipelines = - operations::pipeline::list_pipelines_across_all_tenants_for_monitoring(&txn).await?; + operations::pipeline::list_pipelines_across_all_tenants_clear_rust_compilation( + &txn, + platform_version, + worker_id, + total_workers, + ) + .await?; for (tenant_id, pipeline) in pipelines { - // skip the pipelines that are not assigned to this worker - if !is_pipeline_assigned_to_worker(pipeline.id, worker_id as u64, total_workers as u64) - { - continue; - } - - if pipeline.deployment_resources_status == ResourcesStatus::Stopped { - if pipeline.platform_version == platform_version { - if pipeline.program_status == ProgramStatus::CompilingRust { - // Because `program_info` can be rather large, it is only fetched when - // the program status needs to be reset to `SqlCompiled` - let pipeline_complete = - operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline.id) - .await?; - operations::pipeline::set_program_status( - &txn, - tenant_id, - pipeline.id, - pipeline.program_version, - &ProgramStatus::SqlCompiled, - &Some(pipeline_complete.program_error.sql_compilation.clone().expect("program_error.sql_compilation must be present if current status is CompilingRust")), - &None, - &None, - &Some(pipeline_complete.program_info.clone().expect( - "program_info must be present if current status is CompilingRust", - )), - &None, - &None, - &None, - ) + if pipeline.platform_version == platform_version { + let pipeline_complete = + operations::pipeline::get_pipeline_by_id(&txn, tenant_id, pipeline.id, true) .await?; - } - } else if pipeline.program_status == ProgramStatus::SqlCompiled - || pipeline.program_status == ProgramStatus::CompilingRust - { - operations::pipeline::update_pipeline( - &txn, - true, // Done by compiler - tenant_id, - &pipeline.name, - &None, - &None, - platform_version, - true, - &None, - &None, - &None, - &None, - &None, - ) - .await?; - } + operations::pipeline::set_program_status( + &txn, + tenant_id, + pipeline.id, + pipeline.program_version, + &ProgramStatus::SqlCompiled, + &Some(pipeline_complete.program_error.sql_compilation.clone().expect("program_error.sql_compilation must be present if current status is CompilingRust")), + &None, + &None, + &Some(pipeline_complete.program_info.clone().expect( + "program_info must be present if current status is CompilingRust", + )), + &None, + &None, + &None, + ) + .await?; + } else { + operations::pipeline::update_pipeline( + &txn, + true, // Done by compiler + tenant_id, + &pipeline.name, + &None, + &None, + platform_version, + true, + &None, + &None, + &None, + &None, + &None, + ) + .await?; } } txn.commit().await?; @@ -1302,10 +1320,19 @@ impl Storage for StoragePostgres { how_many: u64, ) -> Result<(ExtendedPipelineDescrMonitoring, Vec), DBError> { let mut client = self.pool.get().await?; - let txn = client.transaction().await?; - let pipeline = - operations::pipeline::get_pipeline_for_monitoring(&txn, tenant_id, pipeline_name) - .await?; + let txn = client + .build_transaction() + .isolation_level(IsolationLevel::RepeatableRead) + .read_only(true) + .start() + .await?; + let pipeline = operations::pipeline::get_pipeline_for_monitoring( + &txn, + tenant_id, + pipeline_name, + false, + ) + .await?; let bundle_data = operations::pipeline::get_support_bundle_data(&txn, pipeline.id, how_many).await?; txn.commit().await?; diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index 6b820cbd92b..fc1bbb03398 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -2,6 +2,7 @@ use crate::api::support_data_collector::SupportBundleData; use crate::auth::{generate_api_key, TenantRecord}; use crate::db::error::DBError; use crate::db::error::DBError::InvalidResourcesStatusNotRemain; +use crate::db::operations::pipeline::get_pipeline_by_id_for_monitoring; use crate::db::storage::{ExtendedPipelineDescrRunner, Storage}; use crate::db::storage_postgres::{is_pipeline_assigned_to_worker, StoragePostgres}; use crate::db::types::api_key::{ApiKeyDescr, ApiKeyId, ApiPermission}; @@ -53,7 +54,8 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use std::vec; -use tokio::sync::Mutex; +use tokio::spawn; +use tokio::sync::{oneshot, Mutex}; use tokio::time::sleep; use tracing::info; use uuid::Uuid; @@ -2712,6 +2714,165 @@ async fn pipeline_provision_version_guard() { .unwrap(); } +#[tokio::test] +async fn pipeline_concurrent_access_stall() { + let handle = test_setup().await; + let tenant_id = TenantRecord::default().id; + + // Create pipeline + let pipeline = handle + .db + .new_pipeline( + tenant_id, + Uuid::now_v7(), + "v0", + PipelineDescr { + name: "example1".to_string(), + description: "d1".to_string(), + runtime_config: json!({}), + program_code: "c1".to_string(), + udf_rust: "r1".to_string(), + udf_toml: "t2".to_string(), + program_config: json!({}), + }, + ) + .await + .unwrap(); + + // Client to create transactions with + let mut client1 = handle.db.pool.get().await.unwrap(); + let mut client2 = handle.db.pool.get().await.unwrap(); + + // Non-conflicting + for (row_lock1, row_lock2) in [(false, false), (true, false), (false, true)] { + let txn1 = client1.transaction().await.unwrap(); + let txn2 = client2.transaction().await.unwrap(); + get_pipeline_by_id_for_monitoring(&txn1, tenant_id, pipeline.id, row_lock1) + .await + .unwrap(); + get_pipeline_by_id_for_monitoring(&txn2, tenant_id, pipeline.id, row_lock2) + .await + .unwrap(); + txn1.commit().await.unwrap(); + txn2.commit().await.unwrap(); + } + + // Conflicting + let txn1 = client1.transaction().await.unwrap(); + let txn2 = client2.transaction().await.unwrap(); + get_pipeline_by_id_for_monitoring(&txn1, tenant_id, pipeline.id, true) + .await + .unwrap(); + txn2.execute("SET LOCAL statement_timeout = 1000", &[]) + .await + .unwrap(); + let error = get_pipeline_by_id_for_monitoring(&txn2, tenant_id, pipeline.id, true) + .await + .unwrap_err(); + let DBError::PostgresError { error, .. } = error else { + unreachable!(); + }; + let sql_state = error.as_db_error().unwrap().code(); + assert!(matches!( + sql_state, + &tokio_postgres::error::SqlState::QUERY_CANCELED + )); + txn1.commit().await.unwrap(); + txn2.commit().await.unwrap(); +} + +#[tokio::test] +async fn pipeline_concurrent_access_deadlock() { + let handle = test_setup().await; + let tenant_id = TenantRecord::default().id; + + // Create pipeline 1 + let pipeline1 = handle + .db + .new_pipeline( + tenant_id, + Uuid::now_v7(), + "v0", + PipelineDescr { + name: "example1".to_string(), + description: "d1".to_string(), + runtime_config: json!({}), + program_code: "c1".to_string(), + udf_rust: "r1".to_string(), + udf_toml: "t2".to_string(), + program_config: json!({}), + }, + ) + .await + .unwrap(); + + // Create pipeline 2 + let pipeline2 = handle + .db + .new_pipeline( + tenant_id, + Uuid::now_v7(), + "v0", + PipelineDescr { + name: "example2".to_string(), + description: "d1".to_string(), + runtime_config: json!({}), + program_code: "c1".to_string(), + udf_rust: "r1".to_string(), + udf_toml: "t2".to_string(), + program_config: json!({}), + }, + ) + .await + .unwrap(); + + // Deadlock: + // - T2 locks pipeline 2 + // - T1 locks pipeline 1 + // - T1 tries to lock pipeline 2 -> waits or deadlock + // - T2 tries to lock pipeline 1 -> waits or deadlock + let mut client1 = handle.db.pool.get().await.unwrap(); + let mut client2 = handle.db.pool.get().await.unwrap(); + let txn2 = client2.transaction().await.unwrap(); + get_pipeline_by_id_for_monitoring(&txn2, tenant_id, pipeline2.id, true) + .await + .unwrap(); + let (tx, rx) = oneshot::channel::<()>(); + let join_handle = spawn(async move { + let txn1 = client1.transaction().await.unwrap(); + get_pipeline_by_id_for_monitoring(&txn1, tenant_id, pipeline1.id, true) + .await + .unwrap(); + tx.send(()).unwrap(); + if let Err(e) = + get_pipeline_by_id_for_monitoring(&txn1, tenant_id, pipeline2.id, true).await + { + return Some(e); + } + txn1.commit().await.unwrap(); + return None; + }); + rx.await.unwrap(); + let t2_error = if let Err(e) = + get_pipeline_by_id_for_monitoring(&txn2, tenant_id, pipeline1.id, true).await + { + Some(e) + } else { + None + }; + let t1_error = join_handle.await.unwrap(); + assert!(!(t1_error.is_some() && t2_error.is_some())); + let error = t1_error.unwrap_or_else(|| t2_error.unwrap()); + let DBError::PostgresError { error, .. } = error else { + unreachable!(); + }; + let sql_state = error.as_db_error().unwrap().code(); + assert!(matches!( + sql_state, + &tokio_postgres::error::SqlState::T_R_DEADLOCK_DETECTED + )); +} + ////////////////////////////////////////////////////////////////////////////// ///// PROP TESTS ///// diff --git a/python/tests/platform/test_pipeline_lifecycle.py b/python/tests/platform/test_pipeline_lifecycle.py index 57dbbe7cf9b..cd180170ca0 100644 --- a/python/tests/platform/test_pipeline_lifecycle.py +++ b/python/tests/platform/test_pipeline_lifecycle.py @@ -186,7 +186,7 @@ def test_pipeline_deleted_during_program_compilation(pipeline_name): @gen_pipeline_name -def test_pipeline_stop_force_after_start(pipeline_name): +def test_pipeline_stop_with_force_after_start(pipeline_name): """ Start and then force stop after varying short delays. """ @@ -195,15 +195,10 @@ def test_pipeline_stop_force_after_start(pipeline_name): ).create_or_replace() for delay_sec in [0, 0.1, 0.5, 1, 3, 10, 20]: - print(f"Testing with {delay_sec} second delay") - - # Issue non-blocking start pipeline.start(wait=False) - - # Shortly wait for the pipeline to transition to next state(s) time.sleep(delay_sec) - - # Stop force and clear the pipeline + pipeline.stop(force=True) + pipeline.start(wait=False) pipeline.stop(force=True) pipeline.clear_storage() @@ -215,73 +210,85 @@ def test_pipeline_stop_with_force(pipeline_name): """ create_pipeline(pipeline_name, "") - # Already stopped force + # Already stopped stop_pipeline(pipeline_name, force=True) - # Start then immediate stop (force) - # - # We do not wait for the pipeline to start, but we do wait for it - # to transition away from "Stopped". Otherwise, there is a race: - # - # - Request start. - # - # - Request force stop. - # - # - Check for "stopped" status succeeds because starting up is - # taking a little while, so we move along to - # start_pipeline_as_paused(). - # - # - Pipeline transitions to "stopping". - # - # - start_pipeline_as_paused() fails with "Cannot restart the - # pipeline while it is stopping. Wait until it is stopped before - # starting the pipeline again." + # Start (don't wait), immediately stop start_pipeline(pipeline_name, wait=False) - pipeline = Pipeline.get(pipeline_name, TEST_CLIENT) - wait_for_condition( - f"{pipeline_name} no longer stopped", - lambda: pipeline.status() != PipelineStatus.STOPPED, - timeout_s=30.0, - poll_interval_s=0.2, - ) stop_pipeline(pipeline_name, force=True) - # Start paused then stop (simulate by pausing immediately) + # Start (wait for running), stop + start_pipeline(pipeline_name) + stop_pipeline(pipeline_name, force=True) + + # Start (wait for paused), stop start_pipeline_as_paused(pipeline_name) stop_pipeline(pipeline_name, force=True) - # Start, stop (without waiting), then stop again + # Start (wait for running), stop twice in a row start_pipeline(pipeline_name) stop_pipeline(pipeline_name, force=True, wait=False) stop_pipeline(pipeline_name, force=True) + # Stopping must complete before starting again. + # + # It is possible that the stopping happens very quickly, as such only if an error occurs, + # is it checked that it is the correct error code. + start_pipeline(pipeline_name) + stop_pipeline(pipeline_name, force=True, wait=False) + error = None + try: + start_pipeline(pipeline_name) + except FelderaAPIError as e: + error = e + if error is not None: + assert error.error_code == "IllegalPipelineAction" + stop_pipeline(pipeline_name, force=True, wait=False) + + +@enterprise_only +@gen_pipeline_name +def test_pipeline_stop_without_force_after_start(pipeline_name): + """ + Start and then stop after varying short delays. + """ + pipeline = PipelineBuilder( + TEST_CLIENT, pipeline_name, "CREATE TABLE t1(c1 INTEGER);" + ).create_or_replace() + + for delay_sec in [0, 0.1, 0.5, 1, 3, 10, 20]: + pipeline.start(wait=False) + time.sleep(delay_sec) + pipeline.stop(force=False) + pipeline.start(wait=False) + pipeline.stop(force=False) + pipeline.clear_storage() + @enterprise_only @gen_pipeline_name def test_pipeline_stop_without_force(pipeline_name): """ - Same sequences but without force (Enterprise only). + Sequences of starting/stopping without force (Enterprise only). """ create_pipeline(pipeline_name, "") # Already stopped stop_pipeline(pipeline_name, force=False) - # Start then stop (non-force) - # - # See test_pipeline_stop_with_force() for notes. + # Start (don't wait), immediately stop start_pipeline(pipeline_name, wait=False) stop_pipeline(pipeline_name, force=False) - # Start, wait for running, stop + # Start (wait for running), stop start_pipeline(pipeline_name) stop_pipeline(pipeline_name, force=False) - # Start paused (pause right away), stop + # Start (wait for paused), stop start_pipeline_as_paused(pipeline_name) stop_pipeline(pipeline_name, force=False) - # Start, stop twice in a row + # Start (wait for running), stop twice in a row start_pipeline(pipeline_name) stop_pipeline(pipeline_name, force=False, wait=False) stop_pipeline(pipeline_name, force=False)