Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
160 commits
Select commit Hold shift + click to select a range
c00b148
feat: trigger mollifier phase 1 scaffolding
d-cs May 12, 2026
ae05184
feat(mollifier): trigger burst smoothing — Phase 1 (trip evaluator + …
d-cs May 12, 2026
31aefa1
chore(mollifier): address CodeRabbit review for phase-1 PR
d-cs May 14, 2026
f4bac2d
fix(mollifier): guard drainer shutdown registration against listener …
d-cs May 14, 2026
cfa6e9e
feat(mollifier): make resolveOrgFlag actually org-scoped via Organiza…
d-cs May 14, 2026
2dee88c
fix(mollifier): mock db.server in gate test to avoid eager prisma con…
d-cs May 14, 2026
4f5978a
chore(mollifier): address review follow-ups for phase-1 PR
d-cs May 14, 2026
6c55bf8
fix(mollifier): extend MollifierEvaluateGate input to carry orgFeatur…
d-cs May 14, 2026
74fe441
fix(mollifier): raise mollifierGate test timeout to 30s for postgresT…
d-cs May 14, 2026
2afbe12
fix(mollifier): keep trigger hot path DB-free and fail open on flag e…
d-cs May 14, 2026
8469561
fix(mollifier): bound drainer shutdown so a hung handler can't block …
d-cs May 14, 2026
d76bb9e
fix(mollifier): keep drainer loop alive across transient redis errors
d-cs May 14, 2026
f1efc41
fix(mollifier): add missing imports to readFallback.server.ts
d-cs May 14, 2026
275a5ba
chore(mollifier): fix misleading rate-counter comment + symmetric eva…
d-cs May 14, 2026
e699034
chore(mollifier): merge phase-1 and phase-2 server-changes into one file
d-cs May 14, 2026
e734490
chore(mollifier): drop fuzz tests to keep phase-1 PR focused
d-cs May 14, 2026
0bf53e7
chore(mollifier): drop drive-by enqueueSystem comment change
d-cs May 14, 2026
edd3250
chore(mollifier): rewrite server-changes note for external readers
d-cs May 14, 2026
1e087e2
chore(mollifier): clarify server-changes note — monitoring only, no d…
d-cs May 14, 2026
7d74b8a
refactor(mollifier): move DI seam types to the modules that define them
d-cs May 14, 2026
5f06709
fix(mollifier): degrade to disabled when redis host is unset, no main…
d-cs May 14, 2026
f91cbf2
fix(mollifier): bound drainer per-tick env fan-out via maxEnvsPerTick
d-cs May 14, 2026
b7e2655
refactor(mollifier): align drainer stop semantics with FairQueue / Ba…
d-cs May 14, 2026
24407fa
fix(mollifier): preserve env fairness when drainer slices
d-cs May 14, 2026
adc29fc
test(mollifier): pin no-starvation property for light env behind heav…
d-cs May 14, 2026
cb8a54d
fix(mollifier): typecheck — destructure popsPerTick to satisfy noUnch…
d-cs May 15, 2026
3daee33
test(mollifier): cover six previously-untested drainer behaviours
d-cs May 15, 2026
2cad05f
feat(mollifier): two-level org→env rotation in drainer for tenant-lev…
d-cs May 15, 2026
2348bf2
chore(mollifier): rewrite changeset as feature intro (drop delta-lang…
d-cs May 15, 2026
5610099
feat(mollifier): track org→envs in the buffer for clean org-level fai…
d-cs May 15, 2026
a1a0a85
revert(mollifier): use standard REDIS_* fallback and fail loud on mis…
d-cs May 15, 2026
650f025
refactor(mollifier): drop the redundant mollifier:envs SET
d-cs May 15, 2026
5163a65
refactor(mollifier): drop global FeatureFlag fallback in hot-path res…
d-cs May 15, 2026
c31eb22
fix(mollifier): pipeline per-tick org→env fan-out and reconcile shutd…
d-cs May 15, 2026
ed0c468
chore(mollifier): refresh redis-worker changeset for buffer-side org …
d-cs May 15, 2026
a467e9e
Merge branch 'main' into mollifier-phase-2
d-cs May 15, 2026
7344211
test(redis-worker): drop vi.fn handler spies from drainer tests
d-cs May 15, 2026
673c7d0
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs May 15, 2026
60f2fb9
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs May 15, 2026
9007053
switch info logging to debug
d-cs May 15, 2026
6487461
refactor(webapp): split mollifier drainer factory into create + start
d-cs May 15, 2026
f02de19
feat(webapp): MollifierSnapshot shared type for mollify + drainer
d-cs May 14, 2026
bf5f66b
test(webapp): failing tests for mollifier read-fallback
d-cs May 14, 2026
5f3c151
feat(webapp): implement read-fallback synthesising QUEUED/FAILED from…
d-cs May 14, 2026
f27db3b
feat(webapp): wire read-fallback to synthesise QUEUED/FAILED from buffer
d-cs May 14, 2026
e26b8a1
feat(webapp): expand SyntheticRun with snapshot-derived + trace fields
d-cs May 14, 2026
9f50216
refactor(webapp): extract #buildEngineTriggerInput so mollify path ca…
d-cs May 14, 2026
efda4f7
test(webapp): failing tests for mollifyTrigger
d-cs May 14, 2026
24dcdb5
feat(webapp): mollifyTrigger writes snapshot to buffer + returns synt…
d-cs May 14, 2026
0d53a2b
feat(webapp): wire real mollify branch in trigger hot path
d-cs May 14, 2026
be99fb4
feat(webapp): wire real mollify branch — remove phase-1 dual-write block
d-cs May 14, 2026
08ae016
test(webapp): failing tests for mollifier drainer handler
d-cs May 14, 2026
e7740d3
feat(webapp): drainer handler that replays engine.trigger from snapshot
d-cs May 14, 2026
fe85bcc
feat(webapp): wire real engine.trigger replay into MollifierDrainer
d-cs May 14, 2026
510ae57
feat(core): optional notice field on TriggerTaskResponse
d-cs May 14, 2026
7286ba7
feat(webapp): mollifier.drained OTEL span with dwell_ms + attempts
d-cs May 14, 2026
552d9e6
feat(webapp): per-env mollifier gate inputs + C1/C3/F4 bypasses
d-cs May 14, 2026
c5abd2d
docs: mollifier phase 3 server-changes note
d-cs May 14, 2026
80ae129
feat(webapp): wire mollifier read-fallback into v1 run-retrieve prese…
d-cs May 15, 2026
f6fb65d
feat(webapp): wire mollifier read-fallback into dashboard run-detail …
d-cs May 15, 2026
be81464
feat(webapp): Recently queued section on runs list + listEntriesForEn…
d-cs May 15, 2026
02c0b71
refactor(webapp): move mollifier drainer bootstrap out of legacy work…
d-cs May 15, 2026
ad90fe3
feat(webapp): MOLLIFIER_DRAINER_ENABLED for per-service drainer control
d-cs May 15, 2026
e5d403e
refactor(webapp): prefix mollifier env vars with TRIGGER_
d-cs May 15, 2026
50868ff
docs(review): clarify what the no-mocking rule is actually for
d-cs May 15, 2026
ee474b5
Merge branch 'main' into mollifier-phase-2
d-cs May 15, 2026
92d0841
fix(redis-worker): clear MollifierDrainer.stop() timeout timer when l…
d-cs May 15, 2026
0d12e7b
refactor(webapp): wire mollifier drainer shutdown through signalsEmitter
d-cs May 15, 2026
f2f4ba6
chore(review): revert the no-mocking-rule clarification
d-cs May 15, 2026
5255c47
perf(webapp): short-circuit mollifier gate when globally disabled
d-cs May 15, 2026
5c729a4
refactor(webapp): move the mollifier-globally-enabled check behind a …
d-cs May 18, 2026
f8c4077
Merge branch 'main' into mollifier-phase-2
d-cs May 18, 2026
c95e141
fix(webapp): fail loud on mollifier drainer misconfiguration
d-cs May 18, 2026
68ae8b0
test(webapp): pin mollifier drainer worker error-classification policy
d-cs May 18, 2026
83c6933
Merge branch 'main' into mollifier-phase-2
d-cs May 18, 2026
7435b2c
Merge branch 'mollifier-phase-2' into mollifier-phase-3
d-cs May 18, 2026
b96bae2
fix(redis-worker): catch processEntry errors in mollifier drainer to …
d-cs May 18, 2026
03ed0b6
Merge branch 'mollifier-phase-2' into mollifier-phase-3
d-cs May 18, 2026
b512583
test(redis-worker): allow timer jitter in mollifier drainer stop-time…
d-cs May 18, 2026
b608ef2
Merge branch 'mollifier-phase-2' into mollifier-phase-3
d-cs May 18, 2026
2e2fff2
test(webapp): bring mollifier integration tests on phase-3 in line wi…
d-cs May 18, 2026
f08eefc
feat(references): add stress-tasks reference project for trigger fan-…
d-cs May 11, 2026
a095e94
docs(stress-tasks): add MOLLIFIER_E2E example payload comment
d-cs May 12, 2026
c2d7d60
docs(stress-tasks): MOLLIFIER_SHADOW trip observation payload
d-cs May 12, 2026
0fa8ec5
docs(stress-tasks): rename mollifier env vars to TRIGGER_MOLLIFIER_* …
d-cs May 18, 2026
01433f5
chore(webapp): drop mollifier gate divert logs to debug
d-cs May 18, 2026
0351a67
feat(webapp): dismissible MollifierBanner on mollified run-detail page
d-cs May 18, 2026
4a8305a
docs(_plans): mollifier rollout playbook
d-cs May 18, 2026
175d759
Merge branch 'mollifier-phase-2' into mollifier-phase-3
d-cs May 18, 2026
854bf38
fix(webapp): seed mollifier run traceContext + propagate drainer trace
d-cs May 18, 2026
8c01cf0
Merge remote-tracking branch 'origin/main' into mollifier-phase-3
d-cs May 20, 2026
c8d036a
docs(_plans): mollifier API parity master plan + per-question designs
d-cs May 20, 2026
6b8a54e
feat(webapp): mollifier read-fallback for /api/v1/runs/{id}/trace
d-cs May 20, 2026
e21dbee
feat(webapp): mollifier read-fallback for spans + attempts + metadata…
d-cs May 20, 2026
015787c
docs(_plans): add progress tracking + Phase A patterns to master plan
d-cs May 20, 2026
709d2f5
feat(redis-worker): migrate mollifier queue from LIST to ZSET (Phase B1)
d-cs May 20, 2026
c193f53
docs(_plans): record B1 progress + requeue-semantics decision
d-cs May 20, 2026
22dbbc9
feat(redis-worker): mollifier ack marks materialised + grace TTL (Pha…
d-cs May 20, 2026
d727e0f
docs(_plans): record B2 progress (commit 22dbbc90f)
d-cs May 20, 2026
08f20c6
feat(redis-worker): add MollifierBuffer.mutateSnapshot (Phase B3)
d-cs May 20, 2026
5849f46
docs(_plans): record B3 progress (commit 08f20c65f)
d-cs May 20, 2026
612babf
feat(webapp): extend SyntheticRun for replay (Phase B4)
d-cs May 20, 2026
3650812
docs(_plans): record B4 progress (commit 612babf6c)
d-cs May 20, 2026
dea1c7c
feat(webapp,redis-worker): mutateWithFallback helper (Phase B5)
d-cs May 20, 2026
9c08f2f
docs(_plans): record B5 progress (commit dea1c7c0d)
d-cs May 20, 2026
0c7c07d
feat(redis-worker): mollifier buffer idempotency-key dedup (Phase B6a)
d-cs May 20, 2026
51b471c
feat(webapp): wire mollifier idempotency into trigger hot path (Phase…
d-cs May 20, 2026
d8a23aa
docs(_plans): record B6 + Phase B complete
d-cs May 20, 2026
d4f7342
feat(webapp,run-engine): cancel API supports buffered runs (Phase C1)
d-cs May 20, 2026
3534f13
fix(webapp): tags route handles buffered runs (Phase C2)
d-cs May 20, 2026
0183e43
feat(webapp): reschedule + replay APIs handle buffered runs (Phase C4…
d-cs May 20, 2026
6d04414
docs(_plans): record Phase C1-C5 status (C3 deferred pending product …
d-cs May 20, 2026
d5c1e22
feat(webapp,redis-worker): metadata PUT handles buffered runs (Phase C3)
d-cs May 20, 2026
63b0a35
docs(_plans): record C3 done (commit d5c1e22b1)
d-cs May 20, 2026
39e3bab
feat(webapp): dashboard cancel/replay/idempotencyKey-reset handle buf…
d-cs May 20, 2026
5b118d2
feat(webapp,redis-worker): listing endpoints merge buffered + PG runs…
d-cs May 21, 2026
0b989f3
docs(_plans): record Phase D + E done
d-cs May 21, 2026
a871022
test(scripts): tighten mollifier parity script with body assertions (…
d-cs May 21, 2026
f2ff1a9
test(run-engine): integration tests for engine.createCancelledRun (Ph…
d-cs May 21, 2026
8639fae
docs(_plans): record F1 + F3 done
d-cs May 21, 2026
c6f741a
test(scripts): mollifier challenge suite for manual API verification
d-cs May 21, 2026
345926f
test(scripts): cover busy → timeout path via direct Redis manipulation
d-cs May 21, 2026
c4dfd8b
docs(webapp): correct TRIGGER_MOLLIFIER_DRAINER_ENABLED comment
d-cs May 21, 2026
469dd3a
test(scripts): fix challenge-01 control trigger ordering
d-cs May 21, 2026
b490afe
fix(webapp): cancel API findResource must return non-null for buffere…
d-cs May 21, 2026
4e7d5d8
fix(webapp): bump applyMetadataMutation retry count + add jittered ba…
d-cs May 21, 2026
fd89156
test(scripts): challenge suite fixes from running validation
d-cs May 21, 2026
eef33e5
fix(run-engine): createCancelledRun normalises snapshot.tags
d-cs May 21, 2026
4e4925d
test: regression coverage for the 3 fixes found by Phase F validation
d-cs May 21, 2026
d499aa5
docs(_plans): pre-gate idempotency-key claim design
d-cs May 21, 2026
0b85126
feat: pre-gate idempotency-key claim serialises same-key triggers
d-cs May 21, 2026
1f90a00
test(scripts): stress tests for pre-gate idempotency claim
d-cs May 21, 2026
d7bdfb5
chore: gitignore .playwright-mcp/ runtime cache
d-cs May 21, 2026
213185b
feat(webapp): dashboard parity for mollifier-buffered runs
d-cs May 21, 2026
2052d3e
fix(webapp): dismiss cancel dialog on submit; reflect cancelled state…
d-cs May 21, 2026
a1fe5f7
feat(webapp): merge mollifier-buffered runs into the dashboard runs list
d-cs May 21, 2026
75c6e2b
fix(webapp): replay dialog falls back to the mollifier buffer
d-cs May 21, 2026
a15566d
fix(webapp): control cancel-run Dialog state so submit isn't raced by…
d-cs May 22, 2026
49b9d00
fix(webapp): make buffered API responses match SDK response shapes
d-cs May 22, 2026
3453618
fix(webapp): align buffered API responses with existing SDK schemas
d-cs May 22, 2026
d4b55c1
feat(webapp): write SYSTEM_FAILURE PG row when drainer hits a non-ret…
d-cs May 22, 2026
fbd5d2f
revert(webapp): drop mollifier listing-merge from runs list
d-cs May 22, 2026
4408743
feat(webapp): open run span before mollifier gate
d-cs May 22, 2026
1ece983
revert(webapp): drop buffered scan from bulk-action service
d-cs May 22, 2026
50106dc
fix(webapp): keep useRealtimeRun stream open across the buffered window
d-cs May 22, 2026
69e8535
test(webapp): pin realtime buffered-resource resolution
d-cs May 22, 2026
14253dc
feat(webapp): mollifier stale-entry sweep + OTel signal
d-cs May 22, 2026
af85cda
feat(webapp): alertable gauge for mollifier stale-entry signal
d-cs May 22, 2026
3549563
docs(mollifier): ops manual with alertable signals and recovery flows
d-cs May 22, 2026
449ded0
docs(mollifier): move ops manual to _ops/
d-cs May 22, 2026
8dc878e
feat(redis-worker,webapp): drop mollifier entry TTL — drainer is the …
d-cs May 22, 2026
97018b1
fix(run-engine): emit runFailed from createFailedTaskRun
d-cs May 22, 2026
3fa9984
test(scripts): mollifier SDK response shape audit (5.6)
d-cs May 22, 2026
92e7d2f
Merge remote-tracking branch 'origin/main' into mollifier-phase-3
d-cs May 22, 2026
e23c6ee
chore(mollifier): consolidate changesets + server-changes; untrack _p…
d-cs May 22, 2026
8e37100
chore(mollifier): rename changeset; untrack stress-tasks, challenge s…
d-cs May 22, 2026
4c0c969
chore(mollifier): restore historical changeset; add buffer-extensions
d-cs May 22, 2026
b5325ed
chore(mollifier): restore changeset content from main (do not edit hi…
d-cs May 22, 2026
3a1494d
chore(mollifier): untrack scripts/mollifier-api-parity.sh
d-cs May 22, 2026
3a9bca2
chore(mollifier): drop docs/realtime change from PR
d-cs May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/mollifier-buffer-extensions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/redis-worker": minor
"@trigger.dev/core": patch
---

Mollifier buffer feature set built on top of the initial primitives: idempotency-lookup with SETNX dedup, atomic snapshot-mutation API (`mutateSnapshot` with tag/metadata/delay/cancel patches), metadata CAS for lossless concurrent updates, watermark-paginated listing, claim primitives for pre-gate idempotency, ZSET-backed per-env queue, 30s post-ack grace TTL, and drop the accept-time entry TTL (drainer is now the only removal mechanism). `@trigger.dev/core` gains an optional `notice` field on the trigger response so the SDK can surface mollifier-queued guidance to customers.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ apps/**/public/build
.mcp.log
.mcp.json
.cursor/debug.log
ailogger-output.log
ailogger-output.log
.playwright-mcp/
6 changes: 6 additions & 0 deletions .server-changes/mollifier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows.
24 changes: 23 additions & 1 deletion apps/webapp/app/components/runs/v3/CancelRunDialog.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { NoSymbolIcon } from "@heroicons/react/24/solid";
import { DialogClose } from "@radix-ui/react-dialog";
import { Form, useNavigation } from "@remix-run/react";
import { useEffect, useRef } from "react";
import { Button } from "~/components/primitives/Buttons";
import { DialogContent, DialogHeader } from "~/components/primitives/Dialog";
import { FormButtons } from "~/components/primitives/FormButtons";
Expand All @@ -10,14 +11,35 @@ import { SpinnerWhite } from "~/components/primitives/Spinner";
type CancelRunDialogProps = {
runFriendlyId: string;
redirectPath: string;
// Optional: when provided, close the dialog as soon as the cancel
// action transitions to "loading" (the redirect is in flight). Lets
// the caller control the open state without interfering with the
// form's submit name=value pair the way `<DialogClose asChild>`
// around the submit button does.
onCancelSubmitted?: () => void;
};

export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialogProps) {
export function CancelRunDialog({
runFriendlyId,
redirectPath,
onCancelSubmitted,
}: CancelRunDialogProps) {
const navigation = useNavigation();

const formAction = `/resources/taskruns/${runFriendlyId}/cancel`;
const isLoading = navigation.formAction === formAction;

const wasSubmitting = useRef(false);
useEffect(() => {
if (!onCancelSubmitted) return;
if (navigation.state === "submitting" && navigation.formAction === formAction) {
wasSubmitting.current = true;
} else if (wasSubmitting.current && navigation.state !== "submitting") {
wasSubmitting.current = false;
onCancelSubmitted();
}
}, [navigation.state, navigation.formAction, formAction, onCancelSubmitted]);

return (
<DialogContent key="cancel">
<DialogHeader>Cancel this run?</DialogHeader>
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -219,6 +220,7 @@ Worker.init().catch((error) => {
});

initMollifierDrainerWorker();
initMollifierStaleSweepWorker();

bootstrap().catch((error) => {
logError(error);
Expand Down
41 changes: 32 additions & 9 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1062,13 +1062,16 @@ const EnvironmentSchema = z
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
// onto a dedicated worker keeps that traffic off the request-serving
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
Expand All @@ -1091,14 +1094,34 @@ const EnvironmentSchema = z
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),

// Periodic sweep that scans buffer queue ZSETs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
// ops. Defaults: enabled when the mollifier is enabled, run every
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
// (matches the sweep interval — "anything still here when we
// check" is the simplest threshold that converges).
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
.int()
Expand Down
120 changes: 117 additions & 3 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";
Expand Down Expand Up @@ -64,13 +68,34 @@ type CommonRelatedRun = Prisma.Result<
"findFirstOrThrow"
>;

type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
// Full shape returned by findRun() — the commonRunSelect fields plus the
// extras the route handler reads. Declared explicitly (not inferred via
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
// run without the type becoming self-referential.
type FoundRun = CommonRelatedRun & {
traceId: string;
payload: string;
payloadType: string;
output: string | null;
outputType: string;
error: Prisma.JsonValue;
attempts: { id: string }[];
attemptNumber: number | null;
engine: "V1" | "V2";
taskEventStore: string;
parentTaskRun: CommonRelatedRun | null;
rootTaskRun: CommonRelatedRun | null;
childRuns: CommonRelatedRun[];
};

export class ApiRetrieveRunPresenter {
constructor(private readonly apiVersion: API_VERSIONS) {}

public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
return $replica.taskRun.findFirst({
public static async findRun(
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: env.id,
Expand Down Expand Up @@ -102,6 +127,23 @@ export class ApiRetrieveRunPresenter {
},
},
});

if (pgRow) return pgRow;

// Postgres miss → fall back to the mollifier buffer. When the gate
// diverted a trigger, the run lives in Redis until the drainer replays
// it through engine.trigger. Synthesise the FoundRun shape so call()
// returns a `QUEUED` (or `FAILED`) response with empty output, no
// attempts, no relations.
const buffered = await findRunByIdWithMollifierFallback({
runId: friendlyId,
environmentId: env.id,
organizationId: env.organizationId,
});

if (!buffered) return null;

return synthesiseFoundRunFromBuffer(buffered);
}

public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
Expand Down Expand Up @@ -475,3 +517,75 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
}
}

// Build a FoundRun-shaped object from a buffered (mollified) run. The run
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
// yet, so every field that comes from execution state (output, attempts,
// completedAt, cost, relations) takes a default. The presenter's call()
// handles QUEUED-state runs without surprise.
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
switch (status) {
case "FAILED":
return "SYSTEM_FAILURE";
case "CANCELED":
return "CANCELED";
default:
return "PENDING";
}
}

function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);

const errorJson: Prisma.JsonValue = buffered.error
? {
type: "STRING_ERROR",
raw: `${buffered.error.code}: ${buffered.error.message}`,
}
: null;

const metadata: Prisma.JsonValue =
typeof buffered.metadata === "string" ? buffered.metadata : null;

return {
id: buffered.friendlyId,
friendlyId: buffered.friendlyId,
status,
taskIdentifier: buffered.taskIdentifier ?? "",
createdAt: buffered.createdAt,
startedAt: null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
expiredAt: null,
delayUntil: buffered.delayUntil ?? null,
metadata,
metadataType: buffered.metadataType ?? "application/json",
ttl: buffered.ttl ?? null,
costInCents: 0,
baseCostInCents: 0,
usageDurationMs: 0,
idempotencyKey: buffered.idempotencyKey ?? null,
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
isTest: buffered.isTest,
depth: buffered.depth,
scheduleId: null,
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
batch: null,
runTags: buffered.tags,
traceId: buffered.traceId ?? "",
payload: typeof buffered.payload === "string" ? buffered.payload : "",
payloadType: buffered.payloadType ?? "application/json",
output: null,
outputType: "application/json",
error: errorJson,
attempts: [],
attemptNumber: null,
engine: "V2",
taskEventStore: "taskEvent",
workerQueue: buffered.workerQueue ?? "main",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
};
}
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ export const ApiRunListSearchParams = z.object({
}),
});

type ApiRunListSearchParams = z.infer<typeof ApiRunListSearchParams>;
export type ApiRunListSearchParamsType = z.infer<typeof ApiRunListSearchParams>;
type ApiRunListSearchParams = ApiRunListSearchParamsType;

export class ApiRunListPresenter extends BasePresenter {
public async call(
Expand Down
42 changes: 36 additions & 6 deletions apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
import { throttle } from "~/utils/throttle";
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
import { deserialiseSnapshot } from "@trigger.dev/redis-worker";
import { tracePubSub } from "~/v3/services/tracePubSub.server";

const PING_INTERVAL = 5_000;
Expand Down Expand Up @@ -37,17 +39,45 @@ export class RunStreamPresenter {
},
});

if (!run) {
// Fall back to the mollifier buffer when the run isn't in PG yet.
// The buffered run has no execution events to stream, but we still
// attach a trace-pubsub subscription using the snapshot's traceId
// so that the moment the drainer materialises the row and execution
// begins, those events flow to this open SSE connection. Closing
// with 404 would force the dashboard to keep retrying.
let traceId: string | null = run?.traceId ?? null;
if (!traceId) {
const buffer = getMollifierBuffer();
if (buffer) {
try {
const entry = await buffer.getEntry(runFriendlyId);
if (entry) {
const snapshot = deserialiseSnapshot<{ traceId?: string }>(entry.payload);
if (typeof snapshot.traceId === "string") {
traceId = snapshot.traceId;
}
}
} catch (err) {
logger.warn("RunStreamPresenter buffer fallback failed", {
runFriendlyId,
err: err instanceof Error ? err.message : String(err),
});
}
}
}

if (!traceId) {
throw new Response("Not found", { status: 404 });
}
const resolvedRun = { traceId };

logger.info("RunStreamPresenter.start", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});

// Subscribe to trace updates
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId);

// Only send max every 1 second
const throttledSend = throttle(
Expand Down Expand Up @@ -105,7 +135,7 @@ export class RunStreamPresenter {
cleanup: () => {
logger.info("RunStreamPresenter.cleanup", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});

// Remove message listener
Expand All @@ -119,13 +149,13 @@ export class RunStreamPresenter {
.then(() => {
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});
})
.catch((error) => {
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
error: {
name: error.name,
message: error.message,
Expand Down
Loading
Loading