pipeline-manager: fix row locking#6500
Conversation
When a table row is retrieved for update purposes, the row itself is now locked by adding `FOR UPDATE` to the query. Although the current global lock mechanism on the database connection of the API server prevents concurrent access, the runner can separately access/change the rows (although, only certain fields when they are in its domain). One such concurrent one is setting of the desired resources status, and the transition of current resources status based on it. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
| event_id: PipelineMonitorEventId, | ||
| ) -> Result<PipelineMonitorEvent, DBError> { | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name) | ||
| let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false) |
There was a problem hiding this comment.
Can you give some guidelines about when to use locking for other people?
It should probably be in the doc of the innermost functions which take this flag.
| // Convert PipelineId UUID to u64 to match PostgreSQL's behavior. | ||
| // This uses the first 8 bytes of the UUID converted to a big-endian u64. | ||
| // This ensures consistent worker assignment between Rust and SQL implementations. | ||
| #[allow(dead_code)] // Might be useful in the future to have equivalent of SQL implementation |
There was a problem hiding this comment.
I don't understand this comment.
| pipeline.id, | ||
| pipeline.program_version, | ||
| &ProgramStatus::SqlCompiled, | ||
| &Some(pipeline_complete.program_error.sql_compilation.clone().expect("program_error.sql_compilation must be present if current status is CompilingRust")), |
There was a problem hiding this comment.
this is a very long line, can something be done about it?
| Ok(result) | ||
| } | ||
|
|
||
| /// Retrieves a list of pipelines across all tenants that are candidates to have SQL Rust |
There was a problem hiding this comment.
SQL Rust? Not just Rust?
The function name is just "sql".
what is "cleared?"
| worker_id: usize, | ||
| total_workers: usize, | ||
| ) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> { | ||
| // See `get_next_sql_compilation` for the worker id matching function explanation. |
There was a problem hiding this comment.
what is "function explanation"?
| OR | ||
| (p.platform_version != $1 AND (p.program_status = 'pending' OR p.program_status = 'compiling_sql')) | ||
| ) | ||
| AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3 |
There was a problem hiding this comment.
This is a very specific formula, which I suspect has to match everywhere. Can you pull it into a reusable function somehow?
mythical-fred
left a comment
There was a problem hiding this comment.
APPROVE on 067aa2cbd3.
Threading explicit row_lock: bool through every get_pipeline* and tacking FOR UPDATE on the read-modify-write paths is the right move; the global API-server lock was always papering over the runner's separate write path. Two things I genuinely liked:
- The
platform_versionUPDATE inset_current_platform_version_runnernow also keys ontenant_id(was id-only). That was a latent multi-tenant bug — well caught and worth a one-line callout in the PR description so it isn't missed at squash time. get_runner_pipelineswitched to an explicitRepeatableRead + read_onlytxn. Right answer; the runner only consumes the snapshot.pipeline_concurrent_access_stallandpipeline_concurrent_access_deadlockare exactly the right shape —statement_timeoutto assert blocking on conflict, and a two-pipeline crossed-lock arrangement to assert PG's deadlock detector trips.
Soft (non-blocking) observations, all in crates/pipeline-manager/src/db/operations/pipeline.rs:
-
abs((... ::bit(64))::bigint) % $2inlist_pipelines_across_all_tenants_clear_{sql,rust}_compilation: Postgresabs(int8)onbigint::MINraisesinteger out of range. With v7 UUIDs this is astronomically unlikely, butabs((x::bit(64)::bigint) & x7FFFFFFFFFFFFFFF)(mask the sign bit) avoids the edge entirely and matches the intent (you only want a non-negative bucket). -
pipline_uuid_to_u64+is_pipeline_assigned_to_workerare now#[allow(dead_code)]: keeping two implementations of the worker-assignment formula (one SQL inline, one Rust) invites drift. If they stay, please add a small unit test that asserts the Rust function and the SQLabs((...bit(64))::bigint) % Nagree for a sample of UUIDs — that's the only thing preventing a silent split next time someone tweaks one side. (Also: the function namepipline_uuid_to_u64has a typo, surface area for a future grep miss.) -
pipeline_concurrent_access_deadlockassertion:assert!(!(t1_error.is_some() && t2_error.is_some()))is necessary but doesn't enforce that exactly one aborted. In a healthy deadlock detector, the surviving transaction must commit successfully — the test then unwraps the surviving txn'scommit()(txn1.commit().await.unwrap()inside the spawn, andtxn2.commit().await.unwrap()after). If somehow both aborted, you'd panic withunreachable!()rather than fail with a clean assertion. Aassert!(t1_error.is_some() ^ t2_error.is_some(), "exactly one txn must abort on deadlock")reads better and gives a clear failure message. -
FOR UPDATEformatted via runtimeformat!in the fourget_pipeline*helpers: cheap (no allocation hot path), but you now have twoprepare_cachedvariants per query (with and withoutFOR UPDATE). Worth a one-line comment onprepare_cachedsaying both shapes will be cached separately so reviewers don't worry the cache is getting clobbered.
Nothing here is a blocker — the PR is solid and the tests would have caught the bug it fixes.
When a table row is retrieved for update purposes, the row itself is now locked by adding
FOR UPDATEto the query. Although the current global lock mechanism on the database connection of the API server prevents concurrent access, the runner can separately access/change the rows (although, only certain fields when they are in its domain). One such concurrent one is setting of the desired resources status, and the transition of current resources status based on it.PR information