-
Notifications
You must be signed in to change notification settings - Fork 133
pipeline-manager: fix row locking #6500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,13 +82,16 @@ pub(crate) async fn get_pipeline( | |
| txn: &Transaction<'_>, | ||
| tenant_id: TenantId, | ||
| name: &str, | ||
| row_lock: bool, | ||
| ) -> Result<ExtendedPipelineDescr, DBError> { | ||
| let stmt = txn | ||
| .prepare_cached(&format!( | ||
| "SELECT {PIPELINE_COLUMNS_ALL} | ||
| FROM pipeline AS p | ||
| WHERE p.tenant_id = $1 AND p.name = $2 | ||
| " | ||
| {} | ||
| ", | ||
| if row_lock { "FOR UPDATE" } else { "" } | ||
| )) | ||
| .await?; | ||
| let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or( | ||
|
|
@@ -103,13 +106,16 @@ pub(crate) async fn get_pipeline_for_monitoring( | |
| txn: &Transaction<'_>, | ||
| tenant_id: TenantId, | ||
| name: &str, | ||
| row_lock: bool, | ||
| ) -> Result<ExtendedPipelineDescrMonitoring, DBError> { | ||
| let stmt = txn | ||
| .prepare_cached(&format!( | ||
| "SELECT {PIPELINE_COLUMNS_MONITORING} | ||
| FROM pipeline AS p | ||
| WHERE p.tenant_id = $1 AND p.name = $2 | ||
| " | ||
| {} | ||
| ", | ||
| if row_lock { "FOR UPDATE" } else { "" } | ||
| )) | ||
| .await?; | ||
| let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or( | ||
|
|
@@ -125,13 +131,16 @@ async fn internal_get_pipeline_by_id( | |
| tenant_id: TenantId, | ||
| pipeline_id: PipelineId, | ||
| fields: &'static str, | ||
| row_lock: bool, | ||
| ) -> Result<Row, DBError> { | ||
| let stmt = txn | ||
| .prepare_cached(&format!( | ||
| "SELECT {fields} | ||
| FROM pipeline AS p | ||
| WHERE p.tenant_id = $1 AND p.id = $2 | ||
| " | ||
| {} | ||
| ", | ||
| if row_lock { "FOR UPDATE" } else { "" } | ||
| )) | ||
| .await?; | ||
| txn.query_opt(&stmt, &[&tenant_id.0, &pipeline_id.0]) | ||
|
|
@@ -143,29 +152,45 @@ pub async fn get_pipeline_by_id( | |
| txn: &Transaction<'_>, | ||
| tenant_id: TenantId, | ||
| pipeline_id: PipelineId, | ||
| row_lock: bool, | ||
| ) -> Result<ExtendedPipelineDescr, DBError> { | ||
| let row = | ||
| internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL).await?; | ||
| internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL, row_lock) | ||
| .await?; | ||
| parse_pipeline_row_all(&row) | ||
| } | ||
|
|
||
| pub async fn get_pipeline_by_id_for_monitoring( | ||
| txn: &Transaction<'_>, | ||
| tenant_id: TenantId, | ||
| pipeline_id: PipelineId, | ||
| row_lock: bool, | ||
| ) -> Result<ExtendedPipelineDescrMonitoring, DBError> { | ||
| let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_MONITORING) | ||
| .await?; | ||
| let row = internal_get_pipeline_by_id( | ||
| txn, | ||
| tenant_id, | ||
| pipeline_id, | ||
| PIPELINE_COLUMNS_MONITORING, | ||
| row_lock, | ||
| ) | ||
| .await?; | ||
| parse_pipeline_row_monitoring(&row) | ||
| } | ||
|
|
||
| pub async fn get_pipeline_by_id_for_event_info( | ||
| txn: &Transaction<'_>, | ||
| tenant_id: TenantId, | ||
| pipeline_id: PipelineId, | ||
| row_lock: bool, | ||
| ) -> Result<ExtendedPipelineDescrEventInfo, DBError> { | ||
| let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_EVENT_INFO) | ||
| .await?; | ||
| let row = internal_get_pipeline_by_id( | ||
| txn, | ||
| tenant_id, | ||
| pipeline_id, | ||
| PIPELINE_COLUMNS_EVENT_INFO, | ||
| row_lock, | ||
| ) | ||
| .await?; | ||
| parse_pipeline_row_event_info(&row) | ||
| } | ||
|
|
||
|
|
@@ -342,7 +367,7 @@ pub(crate) async fn update_pipeline( | |
|
|
||
| // Fetch current pipeline to decide how to update. | ||
| // This will also return an error if the pipeline does not exist. | ||
| let current = get_pipeline(txn, tenant_id, original_name).await?; | ||
| let current = get_pipeline(txn, tenant_id, original_name, true).await?; | ||
|
|
||
| // Pipeline update is allowed if either: | ||
| // - Current status is `Stopped` AND desired status is `Stopped` | ||
|
|
@@ -570,7 +595,7 @@ pub(crate) async fn delete_pipeline( | |
| tenant_id: TenantId, | ||
| name: &str, | ||
| ) -> Result<PipelineId, DBError> { | ||
| let current = get_pipeline(txn, tenant_id, name).await?; | ||
| let current = get_pipeline(txn, tenant_id, name, true).await?; | ||
|
|
||
| // Pipeline deletion is only possible if it is fully stopped | ||
| if current.deployment_resources_status != ResourcesStatus::Stopped | ||
|
|
@@ -611,7 +636,7 @@ pub(crate) async fn set_program_status( | |
| new_program_binary_integrity_checksum: &Option<String>, | ||
| new_program_info_integrity_checksum: &Option<String>, | ||
| ) -> Result<(), DBError> { | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| // Only if the program whose status is being transitioned is the same one can it be updated | ||
| if current.program_version != program_version_guard { | ||
|
|
@@ -874,7 +899,7 @@ pub(crate) async fn dismiss_deployment_error( | |
| tenant_id: TenantId, | ||
| pipeline_name: &str, | ||
| ) -> Result<(), DBError> { | ||
| let current = get_pipeline(txn, tenant_id, pipeline_name).await?; | ||
| let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?; | ||
|
|
||
| // If an error has to be cleared, then it is only possible if it is fully stopped | ||
| if (current.deployment_resources_status != ResourcesStatus::Stopped | ||
|
|
@@ -912,7 +937,7 @@ pub(crate) async fn set_deployment_resources_desired_status( | |
| bootstrap_config: Option<BootstrapConfig>, | ||
| dismiss_error: bool, | ||
| ) -> Result<PipelineId, DBError> { | ||
| let current = get_pipeline(txn, tenant_id, pipeline_name).await?; | ||
| let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?; | ||
|
|
||
| // Deployment error will be automatically dismissed if this is wanted | ||
| let final_deployment_error = if dismiss_error { | ||
|
|
@@ -1054,7 +1079,7 @@ async fn check_version_guard_and_transition_deployment_resources_status( | |
| new_deployment_resources_status: ResourcesStatus, | ||
| remain: bool, | ||
| ) -> Result<ExtendedPipelineDescrMonitoring, DBError> { | ||
| let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| // Use the version guard to check that the deployment is the intended one | ||
| if current.version != version_guard { | ||
|
|
@@ -1124,7 +1149,7 @@ pub(crate) async fn set_deployment_resources_status_stopped( | |
| false, | ||
| ) | ||
| .await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| set_deployment_resources_status( | ||
| txn, | ||
|
|
@@ -1200,7 +1225,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning( | |
| false, | ||
| ) | ||
| .await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| set_deployment_resources_status( | ||
| txn, | ||
|
|
@@ -1273,7 +1298,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned( | |
| false, | ||
| ) | ||
| .await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| set_deployment_resources_status( | ||
| txn, | ||
|
|
@@ -1347,7 +1372,7 @@ pub(crate) async fn set_deployment_resources_status_stopping( | |
| false, | ||
| ) | ||
| .await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| set_deployment_resources_status( | ||
| txn, | ||
|
|
@@ -1570,7 +1595,7 @@ pub(crate) async fn set_storage_status( | |
| pipeline_id: PipelineId, | ||
| new_storage_status: StorageStatus, | ||
| ) -> Result<(), DBError> { | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?; | ||
| let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| // Check that the transition is permitted | ||
| validate_storage_status_transition( | ||
|
|
@@ -1686,6 +1711,96 @@ pub(crate) async fn list_pipelines_across_all_tenants_for_monitoring( | |
| Ok(result) | ||
| } | ||
|
|
||
| /// Retrieves a list of pipelines across all tenants that are candidates to have SQL Rust | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL Rust? Not just Rust? |
||
| /// compilation cleared. | ||
| pub(crate) async fn list_pipelines_across_all_tenants_clear_sql_compilation( | ||
| txn: &Transaction<'_>, | ||
| platform_version: &str, | ||
| worker_id: usize, | ||
| total_workers: usize, | ||
| ) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> { | ||
| // See `get_next_sql_compilation` for the worker id matching function explanation. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is "function explanation"? |
||
| let stmt = txn | ||
| .prepare_cached(&format!( | ||
| "SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING} | ||
| FROM pipeline AS p | ||
| WHERE p.deployment_resources_status = 'stopped' | ||
| AND ( | ||
| (p.platform_version = $1 AND p.program_status = 'compiling_sql') | ||
| OR | ||
| (p.platform_version != $1 AND (p.program_status = 'pending' OR p.program_status = 'compiling_sql')) | ||
| ) | ||
| AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a very specific formula, which I suspect has to match everywhere. Can you pull it into a reusable function somehow? |
||
| ORDER BY p.id ASC | ||
| FOR UPDATE | ||
| " | ||
| )) | ||
| .await?; | ||
| let rows: Vec<Row> = txn | ||
| .query( | ||
| &stmt, | ||
| &[ | ||
| &platform_version.to_string(), | ||
| &(total_workers as i64), | ||
| &(worker_id as i64), | ||
| ], | ||
| ) | ||
| .await?; | ||
| let mut result = Vec::with_capacity(rows.len()); | ||
| for row in rows { | ||
| result.push(( | ||
| TenantId(row.get("tenant_id")), | ||
| parse_pipeline_row_monitoring(&row)?, | ||
| )); | ||
| } | ||
| Ok(result) | ||
| } | ||
|
|
||
| /// Retrieves a list of pipelines across all tenants that are candidates to have their Rust | ||
| /// compilation cleared. | ||
| pub(crate) async fn list_pipelines_across_all_tenants_clear_rust_compilation( | ||
| txn: &Transaction<'_>, | ||
| platform_version: &str, | ||
| worker_id: usize, | ||
| total_workers: usize, | ||
| ) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> { | ||
| // See `get_next_sql_compilation` for the worker id matching function explanation. | ||
| let stmt = txn | ||
| .prepare_cached(&format!( | ||
| "SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING} | ||
| FROM pipeline AS p | ||
| WHERE p.deployment_resources_status = 'stopped' | ||
| AND ( | ||
| (p.platform_version = $1 AND p.program_status = 'compiling_rust') | ||
| OR | ||
| (p.platform_version != $1 AND (p.program_status = 'sql_compiled' OR p.program_status = 'compiling_rust')) | ||
| ) | ||
| AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3 | ||
| ORDER BY p.id ASC | ||
| FOR UPDATE | ||
| " | ||
| )) | ||
| .await?; | ||
| let rows: Vec<Row> = txn | ||
| .query( | ||
| &stmt, | ||
| &[ | ||
| &platform_version.to_string(), | ||
| &(total_workers as i64), | ||
| &(worker_id as i64), | ||
| ], | ||
| ) | ||
| .await?; | ||
| let mut result = Vec::with_capacity(rows.len()); | ||
| for row in rows { | ||
| result.push(( | ||
| TenantId(row.get("tenant_id")), | ||
| parse_pipeline_row_monitoring(&row)?, | ||
| )); | ||
| } | ||
| Ok(result) | ||
| } | ||
|
|
||
| /// Retrieves the pipeline which is stopped, whose program status has been Pending | ||
| /// for the longest, and is of the current platform version. Returns `None` if none is found. | ||
| pub(crate) async fn get_next_sql_compilation( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ pub(crate) async fn get_pipeline_monitor_event_short( | |
| pipeline_name: String, | ||
| event_id: PipelineMonitorEventId, | ||
| ) -> Result<PipelineMonitorEvent, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you give some guidelines about when to use locking for other people? |
||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -51,7 +51,7 @@ pub(crate) async fn get_pipeline_monitor_event_extended( | |
| pipeline_name: String, | ||
| event_id: PipelineMonitorEventId, | ||
| ) -> Result<ExtendedPipelineMonitorEvent, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -76,7 +76,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_short( | |
| tenant_id: TenantId, | ||
| pipeline_name: String, | ||
| ) -> Result<PipelineMonitorEvent, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -103,7 +103,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_extended( | |
| tenant_id: TenantId, | ||
| pipeline_name: String, | ||
| ) -> Result<ExtendedPipelineMonitorEvent, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -130,7 +130,7 @@ pub(crate) async fn list_pipeline_monitor_events_short( | |
| tenant_id: TenantId, | ||
| pipeline_name: String, | ||
| ) -> Result<Vec<PipelineMonitorEvent>, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -157,7 +157,7 @@ pub(crate) async fn list_pipeline_monitor_events_extended( | |
| tenant_id: TenantId, | ||
| pipeline_name: String, | ||
| ) -> Result<Vec<ExtendedPipelineMonitorEvent>, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) | ||
| .await? | ||
| .id; | ||
|
|
||
|
|
@@ -185,7 +185,7 @@ pub(crate) async fn new_pipeline_monitor_event( | |
| pipeline_id: PipelineId, | ||
| new_event_id: Uuid, | ||
| ) -> Result<(), DBError> { | ||
| let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id).await?; | ||
| let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id, true).await?; | ||
|
|
||
| let stmt = txn | ||
| .prepare_cached( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems odd that
*_for monitoringwould need to lock the row