Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 134 additions & 19 deletions crates/pipeline-manager/src/db/operations/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ pub(crate) async fn get_pipeline(
txn: &Transaction<'_>,
tenant_id: TenantId,
name: &str,
row_lock: bool,
) -> Result<ExtendedPipelineDescr, DBError> {
let stmt = txn
.prepare_cached(&format!(
"SELECT {PIPELINE_COLUMNS_ALL}
FROM pipeline AS p
WHERE p.tenant_id = $1 AND p.name = $2
"
{}
",
if row_lock { "FOR UPDATE" } else { "" }
))
.await?;
let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or(
Expand All @@ -103,13 +106,16 @@ pub(crate) async fn get_pipeline_for_monitoring(
txn: &Transaction<'_>,
tenant_id: TenantId,
name: &str,
row_lock: bool,
) -> Result<ExtendedPipelineDescrMonitoring, DBError> {
let stmt = txn
.prepare_cached(&format!(
"SELECT {PIPELINE_COLUMNS_MONITORING}
FROM pipeline AS p
WHERE p.tenant_id = $1 AND p.name = $2
"
{}
",
if row_lock { "FOR UPDATE" } else { "" }
))
.await?;
let row = txn.query_opt(&stmt, &[&tenant_id.0, &name]).await?.ok_or(
Expand All @@ -125,13 +131,16 @@ async fn internal_get_pipeline_by_id(
tenant_id: TenantId,
pipeline_id: PipelineId,
fields: &'static str,
row_lock: bool,
) -> Result<Row, DBError> {
let stmt = txn
.prepare_cached(&format!(
"SELECT {fields}
FROM pipeline AS p
WHERE p.tenant_id = $1 AND p.id = $2
"
{}
",
if row_lock { "FOR UPDATE" } else { "" }
))
.await?;
txn.query_opt(&stmt, &[&tenant_id.0, &pipeline_id.0])
Expand All @@ -143,29 +152,45 @@ pub async fn get_pipeline_by_id(
txn: &Transaction<'_>,
tenant_id: TenantId,
pipeline_id: PipelineId,
row_lock: bool,
) -> Result<ExtendedPipelineDescr, DBError> {
let row =
internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL).await?;
internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_ALL, row_lock)
.await?;
parse_pipeline_row_all(&row)
}

pub async fn get_pipeline_by_id_for_monitoring(
txn: &Transaction<'_>,
tenant_id: TenantId,
pipeline_id: PipelineId,
row_lock: bool,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems odd that *_for monitoring would need to lock the row

) -> Result<ExtendedPipelineDescrMonitoring, DBError> {
let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_MONITORING)
.await?;
let row = internal_get_pipeline_by_id(
txn,
tenant_id,
pipeline_id,
PIPELINE_COLUMNS_MONITORING,
row_lock,
)
.await?;
parse_pipeline_row_monitoring(&row)
}

pub async fn get_pipeline_by_id_for_event_info(
txn: &Transaction<'_>,
tenant_id: TenantId,
pipeline_id: PipelineId,
row_lock: bool,
) -> Result<ExtendedPipelineDescrEventInfo, DBError> {
let row = internal_get_pipeline_by_id(txn, tenant_id, pipeline_id, PIPELINE_COLUMNS_EVENT_INFO)
.await?;
let row = internal_get_pipeline_by_id(
txn,
tenant_id,
pipeline_id,
PIPELINE_COLUMNS_EVENT_INFO,
row_lock,
)
.await?;
parse_pipeline_row_event_info(&row)
}

Expand Down Expand Up @@ -342,7 +367,7 @@ pub(crate) async fn update_pipeline(

// Fetch current pipeline to decide how to update.
// This will also return an error if the pipeline does not exist.
let current = get_pipeline(txn, tenant_id, original_name).await?;
let current = get_pipeline(txn, tenant_id, original_name, true).await?;

// Pipeline update is allowed if either:
// - Current status is `Stopped` AND desired status is `Stopped`
Expand Down Expand Up @@ -570,7 +595,7 @@ pub(crate) async fn delete_pipeline(
tenant_id: TenantId,
name: &str,
) -> Result<PipelineId, DBError> {
let current = get_pipeline(txn, tenant_id, name).await?;
let current = get_pipeline(txn, tenant_id, name, true).await?;

// Pipeline deletion is only possible if it is fully stopped
if current.deployment_resources_status != ResourcesStatus::Stopped
Expand Down Expand Up @@ -611,7 +636,7 @@ pub(crate) async fn set_program_status(
new_program_binary_integrity_checksum: &Option<String>,
new_program_info_integrity_checksum: &Option<String>,
) -> Result<(), DBError> {
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

// Only if the program whose status is being transitioned is the same one can it be updated
if current.program_version != program_version_guard {
Expand Down Expand Up @@ -874,7 +899,7 @@ pub(crate) async fn dismiss_deployment_error(
tenant_id: TenantId,
pipeline_name: &str,
) -> Result<(), DBError> {
let current = get_pipeline(txn, tenant_id, pipeline_name).await?;
let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?;

// If an error has to be cleared, then it is only possible if it is fully stopped
if (current.deployment_resources_status != ResourcesStatus::Stopped
Expand Down Expand Up @@ -912,7 +937,7 @@ pub(crate) async fn set_deployment_resources_desired_status(
bootstrap_config: Option<BootstrapConfig>,
dismiss_error: bool,
) -> Result<PipelineId, DBError> {
let current = get_pipeline(txn, tenant_id, pipeline_name).await?;
let current = get_pipeline(txn, tenant_id, pipeline_name, true).await?;

// Deployment error will be automatically dismissed if this is wanted
let final_deployment_error = if dismiss_error {
Expand Down Expand Up @@ -1054,7 +1079,7 @@ async fn check_version_guard_and_transition_deployment_resources_status(
new_deployment_resources_status: ResourcesStatus,
remain: bool,
) -> Result<ExtendedPipelineDescrMonitoring, DBError> {
let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id_for_monitoring(txn, tenant_id, pipeline_id, true).await?;

// Use the version guard to check that the deployment is the intended one
if current.version != version_guard {
Expand Down Expand Up @@ -1124,7 +1149,7 @@ pub(crate) async fn set_deployment_resources_status_stopped(
false,
)
.await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

set_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1200,7 +1225,7 @@ pub(crate) async fn set_deployment_resources_status_provisioning(
false,
)
.await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

set_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1273,7 +1298,7 @@ pub(crate) async fn set_deployment_resources_status_provisioned(
false,
)
.await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

set_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1347,7 +1372,7 @@ pub(crate) async fn set_deployment_resources_status_stopping(
false,
)
.await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

set_deployment_resources_status(
txn,
Expand Down Expand Up @@ -1570,7 +1595,7 @@ pub(crate) async fn set_storage_status(
pipeline_id: PipelineId,
new_storage_status: StorageStatus,
) -> Result<(), DBError> {
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id).await?;
let current = get_pipeline_by_id(txn, tenant_id, pipeline_id, true).await?;

// Check that the transition is permitted
validate_storage_status_transition(
Expand Down Expand Up @@ -1686,6 +1711,96 @@ pub(crate) async fn list_pipelines_across_all_tenants_for_monitoring(
Ok(result)
}

/// Retrieves a list of pipelines across all tenants that are candidates to have SQL Rust

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL Rust? Not just Rust?
The function name is just "sql".
what is "cleared?"

/// compilation cleared.
pub(crate) async fn list_pipelines_across_all_tenants_clear_sql_compilation(
txn: &Transaction<'_>,
platform_version: &str,
worker_id: usize,
total_workers: usize,
) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> {
// See `get_next_sql_compilation` for the worker id matching function explanation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "function explanation"?

let stmt = txn
.prepare_cached(&format!(
"SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING}
FROM pipeline AS p
WHERE p.deployment_resources_status = 'stopped'
AND (
(p.platform_version = $1 AND p.program_status = 'compiling_sql')
OR
(p.platform_version != $1 AND (p.program_status = 'pending' OR p.program_status = 'compiling_sql'))
)
AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very specific formula, which I suspect has to match everywhere. Can you pull it into a reusable function somehow?

ORDER BY p.id ASC
FOR UPDATE
"
))
.await?;
let rows: Vec<Row> = txn
.query(
&stmt,
&[
&platform_version.to_string(),
&(total_workers as i64),
&(worker_id as i64),
],
)
.await?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
result.push((
TenantId(row.get("tenant_id")),
parse_pipeline_row_monitoring(&row)?,
));
}
Ok(result)
}

/// Retrieves a list of pipelines across all tenants that are candidates to have their Rust
/// compilation cleared.
pub(crate) async fn list_pipelines_across_all_tenants_clear_rust_compilation(
txn: &Transaction<'_>,
platform_version: &str,
worker_id: usize,
total_workers: usize,
) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> {
// See `get_next_sql_compilation` for the worker id matching function explanation.
let stmt = txn
.prepare_cached(&format!(
"SELECT p.tenant_id, {PIPELINE_COLUMNS_MONITORING}
FROM pipeline AS p
WHERE p.deployment_resources_status = 'stopped'
AND (
(p.platform_version = $1 AND p.program_status = 'compiling_rust')
OR
(p.platform_version != $1 AND (p.program_status = 'sql_compiled' OR p.program_status = 'compiling_rust'))
)
AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3
ORDER BY p.id ASC
FOR UPDATE
"
))
.await?;
let rows: Vec<Row> = txn
.query(
&stmt,
&[
&platform_version.to_string(),
&(total_workers as i64),
&(worker_id as i64),
],
)
.await?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
result.push((
TenantId(row.get("tenant_id")),
parse_pipeline_row_monitoring(&row)?,
));
}
Ok(result)
}

/// Retrieves the pipeline which is stopped, whose program status has been Pending
/// for the longest, and is of the current platform version. Returns `None` if none is found.
pub(crate) async fn get_next_sql_compilation(
Expand Down
14 changes: 7 additions & 7 deletions crates/pipeline-manager/src/db/operations/pipeline_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) async fn get_pipeline_monitor_event_short(
pipeline_name: String,
event_id: PipelineMonitorEventId,
) -> Result<PipelineMonitorEvent, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give some guidelines about when to use locking for other people?
It should probably be in the doc of the innermost functions which take this flag.

.await?
.id;

Expand All @@ -51,7 +51,7 @@ pub(crate) async fn get_pipeline_monitor_event_extended(
pipeline_name: String,
event_id: PipelineMonitorEventId,
) -> Result<ExtendedPipelineMonitorEvent, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)
.await?
.id;

Expand All @@ -76,7 +76,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_short(
tenant_id: TenantId,
pipeline_name: String,
) -> Result<PipelineMonitorEvent, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)
.await?
.id;

Expand All @@ -103,7 +103,7 @@ pub(crate) async fn get_latest_pipeline_monitor_event_extended(
tenant_id: TenantId,
pipeline_name: String,
) -> Result<ExtendedPipelineMonitorEvent, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)
.await?
.id;

Expand All @@ -130,7 +130,7 @@ pub(crate) async fn list_pipeline_monitor_events_short(
tenant_id: TenantId,
pipeline_name: String,
) -> Result<Vec<PipelineMonitorEvent>, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)
.await?
.id;

Expand All @@ -157,7 +157,7 @@ pub(crate) async fn list_pipeline_monitor_events_extended(
tenant_id: TenantId,
pipeline_name: String,
) -> Result<Vec<ExtendedPipelineMonitorEvent>, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)
.await?
.id;

Expand Down Expand Up @@ -185,7 +185,7 @@ pub(crate) async fn new_pipeline_monitor_event(
pipeline_id: PipelineId,
new_event_id: Uuid,
) -> Result<(), DBError> {
let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id).await?;
let pipeline = get_pipeline_by_id_for_event_info(txn, tenant_id, pipeline_id, true).await?;

let stmt = txn
.prepare_cached(
Expand Down
Loading
Loading