Skip to content

Commit 0b85126

Browse files
committed
feat: pre-gate idempotency-key claim serialises same-key triggers
Closes the PG+buffer race during the mollifier gate-transition window. Plan: _plans/2026-05-21-mollifier-idempotency-claim.md redis-worker: - New MollifierBuffer methods + atomic Lua: claimIdempotency (SETNX-with-TTL returning claimed/pending/resolved), publishClaim, releaseClaim, readClaim. Separate key namespace mollifier:claim:* to keep isolated from the B6a buffered-side mollifier:idempotency:* lookup. webapp: - New apps/webapp/app/v3/mollifier/idempotencyClaim.server.ts wraps the buffer primitives with a wait/poll loop. Returns claimed / resolved / timed_out. Fail-open on buffer outage so a transient Redis blip doesn't 500 the trigger hot path. - IdempotencyKeyConcern.handleTriggerRequest now consults the claim after the existing PG-findFirst + buffer.lookupIdempotency cache checks miss. Skipped for resumeParentOnCompletion (triggerAndWait bypasses the mollifier gate via F4 and is PG-canonical anyway). When we own the claim, the result's new `claim` field signals the caller to publish on success / release on failure. - RunEngineTriggerTaskService.callV2 wraps the trigger pipeline in a try/catch that publishes the winning runId or releases the claim depending on outcome. The publish updates the claim key so waiters polling for our key resolve to our runId. Validated end-to-end: - scripts/mollifier-challenge/04-idempotency-collision.sh runs cold-gate (no pre-warm) with 30 concurrent same-key triggers and converges on 1 runId / 1 isCached:false. Before this fix the same test produced 2 race-winners. - 13 unit tests covering claimed/resolved/pending/timed_out paths, fail-open behaviour, abort signal, publishClaim, releaseClaim. - All 94 webapp mollifier tests still green.
1 parent d499aa5 commit 0b85126

9 files changed

Lines changed: 668 additions & 28 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add pre-gate idempotency-claim primitives to `MollifierBuffer`: `claimIdempotency` (atomic SETNX-with-TTL claim returning `claimed` / `pending` / `resolved`), `publishClaim` (publish winning runId so waiters resolve), `releaseClaim` (DEL claim on pipeline error), `readClaim` (used by the webapp's wait/poll loop). Uses a separate key namespace `mollifier:claim:{env}:{task}:{key}` to keep isolated from the B6a buffer-side `mollifier:idempotency:...` lookup.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Close the PG+buffer idempotency-key race during the mollifier gate-transition window. Without this, two simultaneous same-key triggers arriving as the gate trips could each become race-winners (one PG, one buffer) — the customer would receive two distinct runIds for the same idempotency key, and operations on the buffered "loser" would silently vanish on drain. Design: `_plans/2026-05-21-mollifier-idempotency-claim.md`.
7+
8+
`IdempotencyKeyConcern.handleTriggerRequest` now does a pre-gate Redis `SETNX` claim after the existing PG + buffer cache checks miss. All same-key triggers serialise through this claim before the gate decides PG-passthrough vs mollify; losers poll until the winner publishes its runId, then return that runId with `isCached:true`. Skipped for `resumeParentOnCompletion` (triggerAndWait bypasses the gate via F4 and is PG-canonical).
9+
10+
`RunEngineTriggerTaskService.callV2` wraps the trigger pipeline in a try/catch around the claim: on success, the winning runId is published to the claim key so waiters resolve; on any pipeline error, the claim is released so the next claimant can retry. Failure to publish/release is logged but non-fatal — the claim TTL (default 30s) is the safety net.
11+
12+
Verified by `scripts/mollifier-challenge/04-idempotency-collision.sh`: 30 cold-gate same-key triggers (no pre-warm) now converge on one runId, one `isCached:false` response, 29 `isCached:true`. Before this fix the same test produced 2 unique runIds and 2 `isCached:false` responses.

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,38 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
22
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
33
import { logger } from "~/services/logger.server";
44
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
5+
import { ServiceValidationError } from "~/v3/services/common.server";
56
import type { RunEngine } from "~/v3/runEngine.server";
67
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
78
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
89
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
10+
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
911
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
1012

13+
// Claim ownership context returned to the caller when the
14+
// IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
15+
// winning runId on pipeline success (`publishClaim`) or release the
16+
// claim on failure (`releaseClaim`).
17+
export type ClaimedIdempotency = {
18+
envId: string;
19+
taskIdentifier: string;
20+
idempotencyKey: string;
21+
};
22+
1123
export type IdempotencyKeyConcernResult =
1224
| { isCached: true; run: TaskRun }
13-
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
25+
| {
26+
isCached: false;
27+
idempotencyKey?: string;
28+
idempotencyKeyExpiresAt?: Date;
29+
// Set when this trigger holds a pre-gate claim. The caller's
30+
// trigger pipeline MUST resolve the claim by either publishing
31+
// the runId on success or releasing on failure. Undefined when
32+
// the request has no idempotency key, when the buffer is
33+
// unavailable, or when the request is a triggerAndWait (claim
34+
// path skipped per plan doc).
35+
claim?: ClaimedIdempotency;
36+
};
1437

1538
export class IdempotencyKeyConcern {
1639
constructor(
@@ -195,6 +218,81 @@ export class IdempotencyKeyConcern {
195218
return { isCached: true, run: existingRun };
196219
}
197220

221+
// Pre-gate claim — closes the PG+buffer race during gate transition
222+
// (see _plans/2026-05-21-mollifier-idempotency-claim.md). All
223+
// same-key triggers serialise here before evaluateGate decides
224+
// PG-pass-through vs mollify. Skipped for triggerAndWait
225+
// (resumeParentOnCompletion) — that path bypasses the gate via F4
226+
// and its existing PG-side dedup is sufficient.
227+
if (!request.body.options?.resumeParentOnCompletion) {
228+
const ttlSeconds = Math.max(
229+
1,
230+
Math.min(
231+
30,
232+
Math.ceil((idempotencyKeyExpiresAt.getTime() - Date.now()) / 1000),
233+
),
234+
);
235+
const outcome = await claimOrAwait({
236+
envId: request.environment.id,
237+
taskIdentifier: request.taskId,
238+
idempotencyKey,
239+
ttlSeconds,
240+
});
241+
if (outcome.kind === "resolved") {
242+
// Another concurrent trigger committed first. Re-resolve via the
243+
// existing checks: writer-side PG findFirst first (defeats
244+
// replica lag), then buffer fallback for the buffered case.
245+
const writerRun = await this.prisma.taskRun.findFirst({
246+
where: {
247+
runtimeEnvironmentId: request.environment.id,
248+
idempotencyKey,
249+
taskIdentifier: request.taskId,
250+
},
251+
include: { associatedWaitpoint: true },
252+
});
253+
if (writerRun) {
254+
return { isCached: true, run: writerRun };
255+
}
256+
const buffered = await this.findBufferedRunWithIdempotency(
257+
request.environment.id,
258+
request.environment.organizationId,
259+
request.taskId,
260+
idempotencyKey,
261+
);
262+
if (buffered) {
263+
return { isCached: true, run: buffered };
264+
}
265+
// Claim resolved to a runId nothing can find — likely the
266+
// claimant errored after publish, or the row TTL'd out. Log
267+
// and fall through to a fresh trigger.
268+
logger.warn("idempotency claim resolved but runId not findable", {
269+
envId: request.environment.id,
270+
taskIdentifier: request.taskId,
271+
claimedRunId: outcome.runId,
272+
});
273+
}
274+
if (outcome.kind === "timed_out") {
275+
throw new ServiceValidationError(
276+
"Idempotency claim resolution timed out",
277+
503,
278+
);
279+
}
280+
if (outcome.kind === "claimed") {
281+
// Caller MUST publish/release. Signalled via the result's
282+
// `claim` field.
283+
return {
284+
isCached: false,
285+
idempotencyKey,
286+
idempotencyKeyExpiresAt,
287+
claim: {
288+
envId: request.environment.id,
289+
taskIdentifier: request.taskId,
290+
idempotencyKey,
291+
},
292+
};
293+
}
294+
}
295+
198296
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
199297
}
200298
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@ import type {
3030
TriggerTaskServiceResult,
3131
} from "../../v3/services/triggerTask.server";
3232
import { clampMaxDuration } from "../../v3/utils/maxDuration";
33-
import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server";
33+
import {
34+
IdempotencyKeyConcern,
35+
type ClaimedIdempotency,
36+
} from "../concerns/idempotencyKeys.server";
37+
import {
38+
publishClaim as publishMollifierClaim,
39+
releaseClaim as releaseMollifierClaim,
40+
} from "~/v3/mollifier/idempotencyClaim.server";
3441
import type {
3542
PayloadProcessor,
3643
QueueManager,
@@ -124,7 +131,15 @@ export class RunEngineTriggerTaskService {
124131
options?: TriggerTaskServiceOptions;
125132
attempt?: number;
126133
}): Promise<TriggerTaskServiceResult | undefined> {
127-
return await startSpan(this.tracer, "RunEngineTriggerTaskService.call()", async (span) => {
134+
// Pre-gate idempotency-claim ownership. Set inside the span when
135+
// `IdempotencyKeyConcern.handleTriggerRequest` returns `claim:
136+
// {...}`. The try/catch below resolves it once the span finishes.
137+
let idempotencyClaim: ClaimedIdempotency | undefined;
138+
try {
139+
const result = await startSpan(
140+
this.tracer,
141+
"RunEngineTriggerTaskService.call()",
142+
async (span) => {
128143
span.setAttribute("taskId", taskId);
129144
span.setAttribute("attempt", attempt);
130145

@@ -247,7 +262,16 @@ export class RunEngineTriggerTaskService {
247262
return idempotencyKeyConcernResult;
248263
}
249264

250-
const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult;
265+
const { idempotencyKey, idempotencyKeyExpiresAt, claim: claimResult } =
266+
idempotencyKeyConcernResult;
267+
268+
// If we own an idempotency claim, the trigger pipeline below MUST
269+
// resolve it — publish on success so waiters see our runId,
270+
// release on error so the next claimant can retry. Stored in an
271+
// outer scope so the try/catch at the bottom of `callV2` can act
272+
// on whichever return path or throw the pipeline takes. Plan doc:
273+
// _plans/2026-05-21-mollifier-idempotency-claim.md
274+
idempotencyClaim = claimResult;
251275

252276
if (idempotencyKey) {
253277
await this.triggerRacepointSystem.waitForRacepoint({
@@ -604,7 +628,27 @@ export class RunEngineTriggerTaskService {
604628

605629
throw error;
606630
}
607-
});
631+
},
632+
);
633+
// Pipeline returned successfully — publish the claim if we held
634+
// one. Waiters polling for our key resolve to this runId.
635+
if (idempotencyClaim && result?.run?.friendlyId) {
636+
await publishMollifierClaim({
637+
envId: idempotencyClaim.envId,
638+
taskIdentifier: idempotencyClaim.taskIdentifier,
639+
idempotencyKey: idempotencyClaim.idempotencyKey,
640+
runId: result.run.friendlyId,
641+
});
642+
}
643+
return result;
644+
} catch (err) {
645+
// Pipeline threw — release the claim so the next claimant can
646+
// retry. Re-throw so the caller sees the original error.
647+
if (idempotencyClaim) {
648+
await releaseMollifierClaim(idempotencyClaim);
649+
}
650+
throw err;
651+
}
608652
}
609653

610654
// Build the engine.trigger() input object from the values gathered during

0 commit comments

Comments
 (0)