perf(run-engine): merge dequeue snapshot creation into taskRun.update transaction [TRI-8450]#3395
perf(run-engine): merge dequeue snapshot creation into taskRun.update transaction [TRI-8450]#3395devin-ai-integration[bot] wants to merge 3 commits intomainfrom
Conversation
… transaction Nest the TaskRunExecutionSnapshot create inside the preceding taskRun.update() call in the dequeue flow, reducing 2 explicit BEGIN/COMMIT transactions to 1 per dequeue operation. This follows the same pattern already used in the completion path (runAttemptSystem.ts:735) and trigger path (engine/index.ts:674-686). Side effects (heartbeat enqueue, executionSnapshotCreated event) are kept outside the transaction and fed the result of the merged write. Also adds a public enqueueHeartbeatIfNeeded() method to ExecutionSnapshotSystem for reuse by other flows that will adopt the same merged pattern. Refs: TRI-8450 Co-Authored-By: Eric Allam <eallam@icloud.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
|
|
Thanks for your contribution! We require all external PRs to be opened in draft status first so you can address CodeRabbit review comments and ensure CI passes before requesting a review. Please re-open this PR as a draft. See CONTRIBUTING.md for details. |
Pre-generate the snapshot ID with SnapshotId.generate() and construct the event/return data from values already available in scope. This removes the extra DB read that was added in the initial merge commit. Co-Authored-By: Eric Allam <eallam@icloud.com>
….generate() Co-Authored-By: Eric Allam <eallam@icloud.com>
Summary
Nests the
TaskRunExecutionSnapshotcreation inside thetaskRun.update()Prisma call in the dequeue flow, reducing 2 DB commits → 1 per dequeue operation. This is the highest-volume of the five unmerged flows identified in TRI-8450 (~9,200 commits/sec on the engine service).Pattern: Follows the same nested-write approach already used in the completion path (
runAttemptSystem.ts:735) and trigger path (engine/index.ts:674).Changes:
dequeueSystem.ts: Moved snapshot creation intoexecutionSnapshots: { create: {...} }within the existingtaskRun.update(). Pre-generates the snapshot ID viagenerateInternalId()(plain cuid, matching what Prisma's@default(cuid())produces) so the event emission, heartbeat enqueue, and return value can all be constructed from data already in scope — no extra DB read needed after the merged write.SnapshotId.toFriendlyId()is used only for the return value'sfriendlyIdfield, matching the originalcreateExecutionSnapshotbehavior.executionSnapshotSystem.ts: Added publicenqueueHeartbeatIfNeeded()method that exposes the heartbeat scheduling logic (previously only available internally viacreateExecutionSnapshot). This is needed becausePENDING_EXECUTINGrequires a heartbeat, unlike theFINISHEDstatus in the completion reference pattern. This method is reusable by future merge targets (retry-immediate, checkpoint, cancel, requeue).Net DB change per dequeue: eliminates 1 write transaction (the separate
TaskRunExecutionSnapshot.create). No extra reads added — the snapshot ID is pre-generated and theexecutionSnapshotCreatedevent payload is constructed inline from values already available in the closure.Review & Testing Checklist for Human
executionSnapshotCreatedevent is now built inline (not read back from DB). Confirm the field values (runStatus: "PENDING",attemptNumber,checkpointId,workerId,runnerId,completedWaitpointIds) match what Prisma actually writes. A mismatch here would be silent — event consumers would get stale/wrong data.attemptNumbersource is equivalent: Old code usedlockedTaskRun.attemptNumber(post-update result). New code usesresult.run.attemptNumber(pre-update). ThetaskRun.update()data payload does NOT includeattemptNumber, so they should be identical — but confirm this assumption holds for all dequeue scenarios (e.g. retried runs).isValiddefaults totruein schema: The oldcreateExecutionSnapshotexplicitly setisValid: error ? false : true. The nested create omitsisValid(no error in the dequeue happy path). Confirm the Prisma schema default forTaskRunExecutionSnapshot.isValidistrue.runStatus: "PENDING"hardcoding matches the mapping: The old code passedlockedTaskRun.status("DEQUEUED") tocreateExecutionSnapshot, which mapped it to "PENDING" viarun.status === "DEQUEUED" ? "PENDING" : run.status. The new code hardcodes"PENDING"directly. This is correct but brittle ifstatusever changes from "DEQUEUED" to something else upstream.completedWaitpointsconnect + order logic: The nested create replicates the connect/order logic fromcreateExecutionSnapshot(lines 387-393). Verify thesnapshot.completedWaitpointstype providesidandindexfields compatible with this usage.checkpointin return value: The return now usessnapshot.checkpoint(from the previous snapshot) instead of reading the newly-created snapshot's checkpoint relation. SincecheckpointIdis passed through unchanged, they should be identical — but worth a sanity check.Recommended test plan: deploy to staging, run the
sample_pg_activity.pysampler for a 5-minute window, and verify the COMMIT count drop on the engine service + proportionalIO:XactSyncreduction.Notes
enqueueHeartbeatIfNeededmethod is deliberately designed for reuse by those follow-up PRs.priority.test.tsfailure in shard 7 is a flaky ordering assertion unrelated to this change (it comparesfriendlyIdvalues in dequeue order). Theauditcheck is also pre-existing/unrelated.Link to Devin session: https://app.devin.ai/sessions/034fe0e7224f49278a2de260203e1377
Requested by: @ericallam