-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(webapp,run-engine): mollifier drainer replay + stale sweep + cancelled-run engine API #3754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
d-cs
wants to merge
5
commits into
mollifier-phase-3-trigger
Choose a base branch
from
mollifier-phase-3-replay
base: mollifier-phase-3-trigger
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
966cd4f
feat(webapp,run-engine): mollifier drainer replay + stale sweep + can…
d-cs fb7cec5
fix(webapp,run-engine): replay-layer code-review follow-ups
d-cs 3453923
test(webapp): isolate mollifierDrainerWorker test from runEngine sing…
d-cs c6fa61f
test(webapp): bump mollifierStaleSweep redisTest timeouts to 20s
d-cs 242ba73
fix(webapp,run-engine): replay-layer Devin follow-ups
d-cs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 170 additions & 0 deletions
170
apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| import { context, trace, TraceFlags } from "@opentelemetry/api"; | ||
| import type { RunEngine } from "@internal/run-engine"; | ||
| import type { PrismaClientOrTransaction } from "@trigger.dev/database"; | ||
| import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; | ||
| import { startSpan } from "~/v3/tracing.server"; | ||
| import type { MollifierSnapshot } from "./mollifierSnapshot.server"; | ||
|
|
||
| const tracer = trace.getTracer("mollifier-drainer"); | ||
|
|
||
| export function isRetryablePgError(err: unknown): boolean { | ||
| if (!(err instanceof Error)) return false; | ||
| const msg = err.message ?? ""; | ||
| // Prisma surfaces P1001 ("Can't reach database server") via two | ||
| // different error classes — `PrismaClientKnownRequestError` exposes | ||
| // it as `err.code`, `PrismaClientInitializationError` exposes it as | ||
| // `err.errorCode`. Check both so reconnection-time errors retry | ||
| // regardless of which class fires. | ||
| const code = (err as { code?: string }).code; | ||
| const errorCode = (err as { errorCode?: string }).errorCode; | ||
| if (code === "P2024") return true; | ||
| if (code === "P1001" || errorCode === "P1001") return true; | ||
| if (msg.includes("Can't reach database server")) return true; | ||
| if (msg.includes("Connection lost")) return true; | ||
| if (msg.includes("ECONNRESET")) return true; | ||
| return false; | ||
| } | ||
|
|
||
| export function createDrainerHandler(deps: { | ||
| engine: RunEngine; | ||
| prisma: PrismaClientOrTransaction; | ||
| }): MollifierDrainerHandler<MollifierSnapshot> { | ||
| return async (input) => { | ||
| const dwellMs = Date.now() - input.createdAt.getTime(); | ||
|
|
||
| // Re-attach to the trace started by the caller's mollifier.queued span | ||
| // (its traceId + spanId were captured into the snapshot at buffer time). | ||
| // Without this the drainer would emit mollifier.drained in a brand-new | ||
| // trace and the engine.trigger instrumentation would inherit an empty | ||
| // active context — leaving the run-detail page with only the root span. | ||
| const snapshotTraceId = | ||
| typeof input.payload.traceId === "string" ? input.payload.traceId : undefined; | ||
| const snapshotSpanId = | ||
| typeof input.payload.spanId === "string" ? input.payload.spanId : undefined; | ||
|
|
||
| const parentContext = | ||
| snapshotTraceId && snapshotSpanId | ||
| ? trace.setSpanContext(context.active(), { | ||
| traceId: snapshotTraceId, | ||
| spanId: snapshotSpanId, | ||
| traceFlags: TraceFlags.SAMPLED, | ||
| isRemote: true, | ||
| }) | ||
| : context.active(); | ||
|
|
||
| // Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call | ||
| // landed on this entry while it was QUEUED, the snapshot carries | ||
| // `cancelledAt` + `cancelReason`. Skip the normal materialise path | ||
| // and write a CANCELED PG row directly. The existing runCancelled | ||
| // handler writes the TaskEvent. | ||
| const cancelledAtStr = | ||
| typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined; | ||
| if (cancelledAtStr) { | ||
| const cancelReason = | ||
| typeof input.payload.cancelReason === "string" | ||
| ? input.payload.cancelReason | ||
| : "Canceled by user"; | ||
| await context.with(parentContext, async () => { | ||
| await startSpan(tracer, "mollifier.drained.cancelled", async (span) => { | ||
| span.setAttribute("mollifier.drained", true); | ||
| span.setAttribute("mollifier.dwell_ms", dwellMs); | ||
| span.setAttribute("mollifier.attempts", input.attempts); | ||
| span.setAttribute("mollifier.run_friendly_id", input.runId); | ||
| span.setAttribute("mollifier.cancel_bifurcation", true); | ||
| span.setAttribute("taskRunId", input.runId); | ||
| await deps.engine.createCancelledRun( | ||
| { | ||
| snapshot: input.payload as any, | ||
| cancelledAt: new Date(cancelledAtStr), | ||
| cancelReason, | ||
| }, | ||
| deps.prisma, | ||
| ); | ||
| }); | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| await context.with(parentContext, async () => { | ||
| await startSpan(tracer, "mollifier.drained", async (span) => { | ||
| span.setAttribute("mollifier.drained", true); | ||
| span.setAttribute("mollifier.dwell_ms", dwellMs); | ||
| span.setAttribute("mollifier.attempts", input.attempts); | ||
| span.setAttribute("mollifier.run_friendly_id", input.runId); | ||
| span.setAttribute("taskRunId", input.runId); | ||
|
|
||
| try { | ||
| await deps.engine.trigger(input.payload as any, deps.prisma); | ||
| } catch (err) { | ||
| // The retryable-PG class re-throws so the drainer's outer | ||
| // worker loop can `buffer.requeue` (handled in | ||
| // `MollifierDrainer.drainOne`). For non-retryable failures we | ||
| // write a terminal SYSTEM_FAILURE row to PG via the engine's | ||
| // existing `createFailedTaskRun` (used by batch-trigger for | ||
| // the same purpose) so the customer sees the run in their | ||
| // dashboard / SDK instead of silently losing it when the | ||
| // buffer entry TTLs out. If THAT insert also fails (PG truly | ||
| // unreachable), rethrow so the drainer's outer catch falls | ||
| // through to its existing `buffer.fail` terminal-marker path. | ||
| if (isRetryablePgError(err)) { | ||
| throw err; | ||
| } | ||
| const reason = err instanceof Error ? err.message : String(err); | ||
| span.setAttribute("mollifier.terminal_failure_reason", reason); | ||
| const snapshot = input.payload as Record<string, unknown>; | ||
| const env = snapshot.environment as | ||
| | { | ||
| id: string; | ||
| type: any; | ||
| project: { id: string }; | ||
| organization: { id: string }; | ||
| } | ||
| | undefined; | ||
| if (!env) { | ||
| // Snapshot too malformed to even construct a TaskRun row. | ||
| // Drainer's outer catch will buffer.fail this entry. | ||
| throw err; | ||
| } | ||
| try { | ||
| await deps.engine.createFailedTaskRun({ | ||
| friendlyId: input.runId, | ||
| environment: env, | ||
| taskIdentifier: String(snapshot.taskIdentifier ?? ""), | ||
| payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined, | ||
| payloadType: | ||
| typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined, | ||
| error: { | ||
| type: "STRING_ERROR", | ||
| raw: `Mollifier drainer terminal failure: ${reason}`, | ||
| }, | ||
| parentTaskRunId: | ||
| typeof snapshot.parentTaskRunId === "string" | ||
| ? snapshot.parentTaskRunId | ||
| : undefined, | ||
| rootTaskRunId: | ||
| typeof snapshot.rootTaskRunId === "string" | ||
| ? snapshot.rootTaskRunId | ||
| : undefined, | ||
| depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, | ||
| resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, | ||
| traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined, | ||
| spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, | ||
| taskEventStore: | ||
| typeof snapshot.taskEventStore === "string" | ||
| ? snapshot.taskEventStore | ||
| : undefined, | ||
| queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined, | ||
| lockedQueueId: | ||
| typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, | ||
| }); | ||
| } catch (writeErr) { | ||
| // Class A — PG itself is failing. Rethrow the original | ||
| // error so the drainer falls back to buffer.fail. Include | ||
| // the write error in the log line at the drainer layer. | ||
| throw err; | ||
| } | ||
| } | ||
| }); | ||
| }); | ||
| }; | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.