Skip to content

pipeline-manager: fix row locking#6500

Open
snkas wants to merge 1 commit into
mainfrom
fix-locking
Open

pipeline-manager: fix row locking#6500
snkas wants to merge 1 commit into
mainfrom
fix-locking

Conversation

@snkas

@snkas snkas commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

When a table row is retrieved for update purposes, the row itself is now locked by adding FOR UPDATE to the query. Although the current global lock mechanism on the database connection of the API server prevents concurrent access, the runner can separately access/change the rows (although, only certain fields when they are in its domain). One such concurrent one is setting of the desired resources status, and the transition of current resources status based on it.

PR information

  • Unit tests added
  • Integration tests updated
  • No backward incompatible changes

When a table row is retrieved for update purposes, the row itself
is now locked by adding `FOR UPDATE` to the query. Although the current
global lock mechanism on the database connection of the API server
prevents concurrent access, the runner can separately access/change the
rows (although, only certain fields when they are in its domain).
One such concurrent one is setting of the desired resources status, and
the transition of current resources status based on it.

Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
@snkas snkas requested a review from gz June 18, 2026 16:44
@snkas

snkas commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

event_id: PipelineMonitorEventId,
) -> Result<PipelineMonitorEvent, DBError> {
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name)
let pipeline_id = get_pipeline_for_monitoring(txn, tenant_id, &pipeline_name, false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give some guidelines about when to use locking for other people?
It should probably be in the doc of the innermost functions which take this flag.

// Convert PipelineId UUID to u64 to match PostgreSQL's behavior.
// This uses the first 8 bytes of the UUID converted to a big-endian u64.
// This ensures consistent worker assignment between Rust and SQL implementations.
#[allow(dead_code)] // Might be useful in the future to have equivalent of SQL implementation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment.

pipeline.id,
pipeline.program_version,
&ProgramStatus::SqlCompiled,
&Some(pipeline_complete.program_error.sql_compilation.clone().expect("program_error.sql_compilation must be present if current status is CompilingRust")),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very long line, can something be done about it?

Ok(result)
}

/// Retrieves a list of pipelines across all tenants that are candidates to have SQL Rust

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL Rust? Not just Rust?
The function name is just "sql".
what is "cleared?"

worker_id: usize,
total_workers: usize,
) -> Result<Vec<(TenantId, ExtendedPipelineDescrMonitoring)>, DBError> {
// See `get_next_sql_compilation` for the worker id matching function explanation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "function explanation"?

OR
(p.platform_version != $1 AND (p.program_status = 'pending' OR p.program_status = 'compiling_sql'))
)
AND (abs(('x' || substr(replace(p.id::text, '-', ''), 1, 16))::bit(64)::bigint) % $2) = $3

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very specific formula, which I suspect has to match everywhere. Can you pull it into a reusable function somehow?

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APPROVE on 067aa2cbd3.

Threading explicit row_lock: bool through every get_pipeline* and tacking FOR UPDATE on the read-modify-write paths is the right move; the global API-server lock was always papering over the runner's separate write path. Two things I genuinely liked:

  • The platform_version UPDATE in set_current_platform_version_runner now also keys on tenant_id (was id-only). That was a latent multi-tenant bug — well caught and worth a one-line callout in the PR description so it isn't missed at squash time.
  • get_runner_pipeline switched to an explicit RepeatableRead + read_only txn. Right answer; the runner only consumes the snapshot.
  • pipeline_concurrent_access_stall and pipeline_concurrent_access_deadlock are exactly the right shape — statement_timeout to assert blocking on conflict, and a two-pipeline crossed-lock arrangement to assert PG's deadlock detector trips.

Soft (non-blocking) observations, all in crates/pipeline-manager/src/db/operations/pipeline.rs:

  1. abs((... ::bit(64))::bigint) % $2 in list_pipelines_across_all_tenants_clear_{sql,rust}_compilation: Postgres abs(int8) on bigint::MIN raises integer out of range. With v7 UUIDs this is astronomically unlikely, but abs((x::bit(64)::bigint) & x7FFFFFFFFFFFFFFF) (mask the sign bit) avoids the edge entirely and matches the intent (you only want a non-negative bucket).

  2. pipline_uuid_to_u64 + is_pipeline_assigned_to_worker are now #[allow(dead_code)]: keeping two implementations of the worker-assignment formula (one SQL inline, one Rust) invites drift. If they stay, please add a small unit test that asserts the Rust function and the SQL abs((...bit(64))::bigint) % N agree for a sample of UUIDs — that's the only thing preventing a silent split next time someone tweaks one side. (Also: the function name pipline_uuid_to_u64 has a typo, surface area for a future grep miss.)

  3. pipeline_concurrent_access_deadlock assertion: assert!(!(t1_error.is_some() && t2_error.is_some())) is necessary but doesn't enforce that exactly one aborted. In a healthy deadlock detector, the surviving transaction must commit successfully — the test then unwraps the surviving txn's commit() (txn1.commit().await.unwrap() inside the spawn, and txn2.commit().await.unwrap() after). If somehow both aborted, you'd panic with unreachable!() rather than fail with a clean assertion. A assert!(t1_error.is_some() ^ t2_error.is_some(), "exactly one txn must abort on deadlock") reads better and gives a clear failure message.

  4. FOR UPDATE formatted via runtime format! in the four get_pipeline* helpers: cheap (no allocation hot path), but you now have two prepare_cached variants per query (with and without FOR UPDATE). Worth a one-line comment on prepare_cached saying both shapes will be cached separately so reviewers don't worry the cache is getting clobbered.

Nothing here is a blocker — the PR is solid and the tests would have caught the bug it fixes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants