Skip to content

Commit 612babf

Browse files
committed
feat(webapp): extend SyntheticRun for replay (Phase B4)
The mollifier read-fallback's SyntheticRun previously carried just enough fields for the API retrieve/trace/spans/events/attempts/metadata endpoints. Phase C5 (replay) needs the buffered run to be passable where ReplayTaskRunService expects a TaskRun. Adds the missing fields: id, runtimeEnvironmentId, engine, workerQueue, queue, concurrencyKey, machinePreset, realtimeStreamsVersion, seedMetadata, seedMetadataType, runTags. All populated from the engine-trigger snapshot embedded in the buffer entry. Also closes a pre-existing typecheck gap in ApiRetrieveRunPresenter.synthesiseFoundRunFromBuffer — workerQueue wasn't populated and the file had been failing tsc. Now surfaces the buffered run's workerQueue, defaulting to "main" (the Prisma default).
1 parent 5849f46 commit 612babf

4 files changed

Lines changed: 112 additions & 1 deletion

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Extend `SyntheticRun` (the mollifier read-fallback synthesised TaskRun shape) with the fields `ReplayTaskRunService` reads: `id`, `runtimeEnvironmentId`, `engine`, `workerQueue`, `queue`, `concurrencyKey`, `machinePreset`, `realtimeStreamsVersion`, `seedMetadata`, `seedMetadataType`, and `runTags`. Populated from the buffered run's engine-trigger snapshot. Also closes a pre-existing typecheck gap in `ApiRetrieveRunPresenter.synthesiseFoundRunFromBuffer` by surfacing `workerQueue` (defaulting to `"main"`) on the synthesised FoundRun.

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
573573
attemptNumber: null,
574574
engine: "V2",
575575
taskEventStore: "taskEvent",
576+
workerQueue: buffered.workerQueue ?? "main",
576577
parentTaskRun: null,
577578
rootTaskRun: null,
578579
childRuns: [],

apps/webapp/app/v3/mollifier/readFallback.server.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
2+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
23
import { logger } from "~/services/logger.server";
34
import { deserialiseMollifierSnapshot } from "./mollifierSnapshot.server";
45
import { getMollifierBuffer } from "./mollifierBuffer.server";
@@ -10,6 +11,10 @@ export type ReadFallbackInput = {
1011
};
1112

1213
export type SyntheticRun = {
14+
// Snapshot-derived TaskRun primary key. Used by ReplayTaskRunService
15+
// for logging and by callers passing this object where a TaskRun is
16+
// expected (cast). Derived deterministically from `friendlyId`.
17+
id: string;
1318
friendlyId: string;
1419
status: "QUEUED" | "FAILED";
1520
taskIdentifier: string | undefined;
@@ -19,13 +24,23 @@ export type SyntheticRun = {
1924
payloadType: string | undefined;
2025
metadata: unknown;
2126
metadataType: string | undefined;
27+
// Seed-metadata mirrors what `triggerTask.server.ts` writes into the
28+
// snapshot: the original metadataPacket data preserved separately from
29+
// any later customer mutations. ReplayTaskRunService uses these to
30+
// rebuild the replay's metadata.
31+
seedMetadata: string | undefined;
32+
seedMetadataType: string | undefined;
2233

2334
idempotencyKey: string | undefined;
2435
idempotencyKeyOptions: string[] | undefined;
2536
isTest: boolean;
2637
depth: number;
2738
ttl: string | undefined;
2839
tags: string[];
40+
// Mirror of `tags` under the PG field name. ReplayTaskRunService reads
41+
// `existingTaskRun.runTags`; both names are kept here so a synthetic
42+
// run can be passed wherever the PG-shape `runTags` is expected.
43+
runTags: string[];
2944
lockedToVersion: string | undefined;
3045
resumeParentOnCompletion: boolean;
3146
parentTaskRunId: string | undefined;
@@ -36,6 +51,17 @@ export type SyntheticRun = {
3651
spanId: string | undefined;
3752
parentSpanId: string | undefined;
3853

54+
// Replay-relevant fields populated from the engine-trigger snapshot.
55+
// ReplayTaskRunService reads each of these from the existing TaskRun;
56+
// when the original lives in the buffer we synthesise them here.
57+
runtimeEnvironmentId: string | undefined;
58+
engine: "V2";
59+
workerQueue: string | undefined;
60+
queue: string | undefined;
61+
concurrencyKey: string | undefined;
62+
machinePreset: string | undefined;
63+
realtimeStreamsVersion: string | undefined;
64+
3965
error?: { code: string; message: string };
4066
};
4167

@@ -77,7 +103,14 @@ export async function findRunByIdWithMollifierFallback(
77103
? asStringArray(idempotencyKeyOptionsRaw)
78104
: undefined;
79105

106+
const tags = asStringArray(snapshot.tags);
107+
const environment =
108+
snapshot.environment && typeof snapshot.environment === "object"
109+
? (snapshot.environment as Record<string, unknown>)
110+
: undefined;
111+
80112
return {
113+
id: RunId.fromFriendlyId(entry.runId),
81114
friendlyId: entry.runId,
82115
status: entry.status === "FAILED" ? "FAILED" : "QUEUED",
83116
taskIdentifier: asString(snapshot.taskIdentifier),
@@ -87,13 +120,16 @@ export async function findRunByIdWithMollifierFallback(
87120
payloadType: asString(snapshot.payloadType),
88121
metadata: snapshot.metadata,
89122
metadataType: asString(snapshot.metadataType),
123+
seedMetadata: asString(snapshot.seedMetadata),
124+
seedMetadataType: asString(snapshot.seedMetadataType),
90125

91126
idempotencyKey: asString(snapshot.idempotencyKey),
92127
idempotencyKeyOptions,
93128
isTest: snapshot.isTest === true,
94129
depth: typeof snapshot.depth === "number" ? snapshot.depth : 0,
95130
ttl: asString(snapshot.ttl),
96-
tags: asStringArray(snapshot.tags),
131+
tags,
132+
runTags: tags,
97133
lockedToVersion: asString(snapshot.lockToVersion),
98134
resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true,
99135
parentTaskRunId: asString(snapshot.parentTaskRunId),
@@ -102,6 +138,15 @@ export async function findRunByIdWithMollifierFallback(
102138
spanId: asString(snapshot.spanId),
103139
parentSpanId: asString(snapshot.parentSpanId),
104140

141+
runtimeEnvironmentId:
142+
asString(environment?.id) ?? entry.envId,
143+
engine: "V2",
144+
workerQueue: asString(snapshot.workerQueue),
145+
queue: asString(snapshot.queue),
146+
concurrencyKey: asString(snapshot.concurrencyKey),
147+
machinePreset: asString(snapshot.machine),
148+
realtimeStreamsVersion: asString(snapshot.realtimeStreamsVersion),
149+
105150
error: entry.lastError,
106151
};
107152
} catch (err) {

apps/webapp/test/mollifierReadFallback.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,63 @@ describe("findRunByIdWithMollifierFallback", () => {
216216
expect(result!.traceId).toBeUndefined();
217217
expect(result!.spanId).toBeUndefined();
218218
});
219+
220+
it("populates replay-relevant fields from the snapshot", async () => {
221+
const entry: BufferEntry = {
222+
runId: "run_1",
223+
envId: "env_a",
224+
orgId: "org_1",
225+
payload: JSON.stringify({
226+
taskIdentifier: "my-task",
227+
environment: { id: "env_a" },
228+
workerQueue: "default",
229+
queue: "task/my-task",
230+
concurrencyKey: "tenant-42",
231+
machine: "medium-1x",
232+
realtimeStreamsVersion: "v2",
233+
seedMetadata: '{"k":"v"}',
234+
seedMetadataType: "application/json",
235+
tags: ["t1", "t2"],
236+
}),
237+
status: "QUEUED",
238+
attempts: 0,
239+
createdAt: NOW,
240+
};
241+
const result = await findRunByIdWithMollifierFallback(
242+
{ runId: "run_1", environmentId: "env_a", organizationId: "org_1" },
243+
{ getBuffer: () => fakeBuffer(entry) },
244+
);
245+
expect(result).not.toBeNull();
246+
expect(result!.id).toBeTypeOf("string");
247+
expect(result!.id.length).toBeGreaterThan(0);
248+
expect(result!.engine).toBe("V2");
249+
expect(result!.runtimeEnvironmentId).toBe("env_a");
250+
expect(result!.workerQueue).toBe("default");
251+
expect(result!.queue).toBe("task/my-task");
252+
expect(result!.concurrencyKey).toBe("tenant-42");
253+
expect(result!.machinePreset).toBe("medium-1x");
254+
expect(result!.realtimeStreamsVersion).toBe("v2");
255+
expect(result!.seedMetadata).toBe('{"k":"v"}');
256+
expect(result!.seedMetadataType).toBe("application/json");
257+
expect(result!.runTags).toEqual(["t1", "t2"]);
258+
});
259+
260+
it("falls back to entry.envId for runtimeEnvironmentId when snapshot lacks environment.id", async () => {
261+
const entry: BufferEntry = {
262+
runId: "run_1",
263+
envId: "env_a",
264+
orgId: "org_1",
265+
payload: JSON.stringify({ taskIdentifier: "t" }),
266+
status: "QUEUED",
267+
attempts: 0,
268+
createdAt: NOW,
269+
};
270+
const result = await findRunByIdWithMollifierFallback(
271+
{ runId: "run_1", environmentId: "env_a", organizationId: "org_1" },
272+
{ getBuffer: () => fakeBuffer(entry) },
273+
);
274+
expect(result!.runtimeEnvironmentId).toBe("env_a");
275+
expect(result!.workerQueue).toBeUndefined();
276+
expect(result!.queue).toBeUndefined();
277+
});
219278
});

0 commit comments

Comments
 (0)