Skip to content

Commit 07f7c65

Browse files
authored
v3 queue system (triggerdotdev#903)
* Introducing Modular Asynchronous Reliable Queueing System (MarQS). Works in dev * Convert MarQS to using lua and dealing with concurrency * Simplified the timeout queue and current concurrency is now a set instead of a flat value (to support idempotency) * Implement task heartbeating and reconnect the background workers CLI when the websocket connection reconnects * Start adding internal telemetry support for the server * Get env vars to work in dev and implement prisma tracing in webapp * Cleanup telemetry and implement it in the consumer * Implement dequeuing a message from a parent shared queue * Implement a custom logger exporter instead of using console log exporter * Use node instead of shell for generating protocol buffer code * Propogate trace context into debug logs, and allow turning off logger exporter through env vars * Switch to using baselime for internal otel data * Make orgMember optional to fix type issues * Provide the CLI dev env vars through the CLI, don’t build dotenv into facade * Removed the logger import * Address Matt’s comments * Addressing more of Matt’s comments * Handle sending an execution after a websocket connection closes * Remove auth from the env attributes to prevent obfuscation
1 parent d04c8e8 commit 07f7c65

File tree

57 files changed

+2748
-746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2748
-746
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ NODE_ENV=development
3131
# FROM_EMAIL=
3232
# REPLY_TO_EMAIL=
3333

34+
# Remove the following line to enable logging telemetry traces to the console
35+
LOG_TELEMETRY="false"
36+
3437
# CLOUD VARIABLES
3538
POSTHOG_PROJECT_KEY=
3639
PLAIN_API_KEY=

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"request": "launch",
1010
"name": "Debug WebApp",
1111
"command": "pnpm run dev --filter webapp",
12-
"envFile": "${workspaceFolder}/apps/webapp/.env",
12+
"envFile": "${workspaceFolder}/.env",
1313
"cwd": "${workspaceFolder}",
1414
"sourceMaps": true
1515
},

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ const EnvironmentSchema = z.object({
7070

7171
//v3
7272
V3_ENABLED: z.string().default("false"),
73+
OTLP_EXPORTER_TRACES_URL: z.string().optional(),
74+
LOG_TELEMETRY: z.string().default("true"),
7375
});
7476

7577
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export async function findEnvironmentByApiKey(apiKey: string) {
1111
include: {
1212
project: true,
1313
organization: true,
14+
orgMember: true,
1415
},
1516
});
1617

@@ -30,6 +31,7 @@ export async function findEnvironmentByPublicApiKey(apiKey: string) {
3031
include: {
3132
project: true,
3233
organization: true,
34+
orgMember: true,
3335
},
3436
});
3537

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ import type {
1010
TaskSpec,
1111
} from "graphile-worker";
1212
import { run as graphileRun, parseCronItems } from "graphile-worker";
13+
import { SpanKind, trace } from "@opentelemetry/api";
1314

1415
import omit from "lodash.omit";
1516
import { z } from "zod";
1617
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
1718
import { PgListenService } from "~/services/db/pgListen.server";
18-
import { workerLogger as logger, trace } from "~/services/logger.server";
19+
import { workerLogger as logger } from "~/services/logger.server";
20+
import { flattenAttributes } from "@trigger.dev/core/v3";
21+
22+
const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");
1923

2024
export interface MessageCatalogSchema {
2125
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
@@ -518,13 +522,43 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
518522
throw new Error(`No task for message type: ${String(typeName)}`);
519523
}
520524

521-
await trace(
525+
await tracer.startActiveSpan(
526+
`Run ${typeName as string}`,
522527
{
523-
worker_job: job,
524-
worker_name: this.#name,
528+
kind: SpanKind.CONSUMER,
529+
attributes: {
530+
"job.task_identifier": job.task_identifier,
531+
"job.id": job.id,
532+
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
533+
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
534+
"job.priority": job.priority,
535+
"job.run_at": job.run_at.toISOString(),
536+
"job.attempts": job.attempts,
537+
"job.max_attempts": job.max_attempts,
538+
"job.created_at": job.created_at.toISOString(),
539+
"job.updated_at": job.updated_at.toISOString(),
540+
...(job.key ? { "job.key": job.key } : {}),
541+
"job.revision": job.revision,
542+
...(job.locked_at ? { "job.locked_at": job.locked_at.toISOString() } : {}),
543+
...(job.locked_by ? { "job.locked_by": job.locked_by } : {}),
544+
...(job.flags ? flattenAttributes(job.flags, "job.flags") : {}),
545+
"worker.name": this.#name,
546+
},
525547
},
526-
async () => {
527-
await task.handler(payload, job);
548+
async (span) => {
549+
try {
550+
await task.handler(payload, job);
551+
} catch (error) {
552+
if (error instanceof Error) {
553+
span.recordException(error);
554+
} else {
555+
span.recordException(new Error(String(error)));
556+
}
557+
558+
throw error;
559+
} finally {
560+
span.end();
561+
}
528562
}
529563
);
530564
}
@@ -558,16 +592,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
558592

559593
const payload = parsedPayload.data;
560594

561-
try {
562-
await recurringTask.handler(payload._cron, job);
563-
} catch (error) {
564-
logger.error("Failed to handle recurring task", {
565-
error,
566-
payload,
567-
});
568-
569-
throw error;
570-
}
595+
await tracer.startActiveSpan(
596+
`Run ${typeName as string} recurring`,
597+
{
598+
kind: SpanKind.CONSUMER,
599+
attributes: {
600+
"job.task_identifier": job.task_identifier,
601+
"job.id": job.id,
602+
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
603+
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
604+
"job.priority": job.priority,
605+
"job.run_at": job.run_at.toISOString(),
606+
"job.attempts": job.attempts,
607+
"job.max_attempts": job.max_attempts,
608+
"job.created_at": job.created_at.toISOString(),
609+
"job.updated_at": job.updated_at.toISOString(),
610+
...(job.key ? { "job.key": job.key } : {}),
611+
"job.revision": job.revision,
612+
...(job.locked_at ? { "job.locked_at": job.locked_at.toISOString() } : {}),
613+
...(job.locked_by ? { "job.locked_by": job.locked_by } : {}),
614+
...(job.flags ? flattenAttributes(job.flags, "job.flags") : {}),
615+
"worker.name": this.#name,
616+
},
617+
},
618+
async (span) => {
619+
try {
620+
await recurringTask.handler(payload._cron, job);
621+
} catch (error) {
622+
if (error instanceof Error) {
623+
span.recordException(error);
624+
} else {
625+
span.recordException(new Error(String(error)));
626+
}
627+
628+
throw error;
629+
} finally {
630+
span.end();
631+
}
632+
}
633+
);
571634
}
572635

573636
async #handleCleanup(rawPayload: unknown, helpers: JobHelpers): Promise<void> {

apps/webapp/app/services/apiAuth.server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
import { Prettify } from "@trigger.dev/core";
12
import { z } from "zod";
23
import {
34
findEnvironmentByApiKey,
45
findEnvironmentByPublicApiKey,
56
} from "~/models/runtimeEnvironment.server";
67

8+
type Optional<T, K extends keyof T> = Prettify<Omit<T, K> & Partial<Pick<T, K>>>;
9+
710
const AuthorizationHeaderSchema = z.string().regex(/^Bearer .+$/);
811

9-
export type AuthenticatedEnvironment = NonNullable<
10-
Awaited<ReturnType<typeof findEnvironmentByApiKey>>
12+
export type AuthenticatedEnvironment = Optional<
13+
NonNullable<Awaited<ReturnType<typeof findEnvironmentByApiKey>>>,
14+
"orgMember"
1115
>;
1216

1317
type ApiAuthenticationResult = {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import {
2+
ZodMessageHandler,
3+
ZodMessageSender,
4+
clientWebsocketMessages,
5+
serverWebsocketMessages,
6+
} from "@trigger.dev/core/v3";
7+
import { Evt } from "evt";
8+
import { randomUUID } from "node:crypto";
9+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10+
import { logger } from "~/services/logger.server";
11+
import { EnvironmentQueueConsumer } from "./marqs/environmentQueueConsumer.server";
12+
import type { WebSocket, MessageEvent, CloseEvent, ErrorEvent } from "ws";
13+
14+
export class AuthenticatedSocketConnection {
15+
public id: string;
16+
public onClose: Evt<CloseEvent> = new Evt();
17+
18+
private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
19+
private _environmentConsumer: EnvironmentQueueConsumer;
20+
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;
21+
22+
constructor(public ws: WebSocket, public authenticatedEnv: AuthenticatedEnvironment) {
23+
this.id = randomUUID();
24+
25+
this._sender = new ZodMessageSender({
26+
schema: serverWebsocketMessages,
27+
sender: async (message) => {
28+
return new Promise((resolve, reject) => {
29+
ws.send(JSON.stringify(message), {}, (err) => {
30+
if (err) {
31+
reject(err);
32+
return;
33+
}
34+
35+
resolve();
36+
});
37+
});
38+
},
39+
});
40+
41+
this._environmentConsumer = new EnvironmentQueueConsumer(authenticatedEnv, this._sender);
42+
43+
ws.addEventListener("message", this.#handleMessage.bind(this));
44+
ws.addEventListener("close", this.#handleClose.bind(this));
45+
ws.addEventListener("error", this.#handleError.bind(this));
46+
47+
this._messageHandler = new ZodMessageHandler({
48+
schema: clientWebsocketMessages,
49+
messages: {
50+
READY_FOR_TASKS: async (payload) => {
51+
await this._environmentConsumer.registerBackgroundWorker(payload.backgroundWorkerId);
52+
},
53+
54+
BACKGROUND_WORKER_MESSAGE: async (payload) => {
55+
switch (payload.data.type) {
56+
case "TASK_RUN_COMPLETED": {
57+
await this._environmentConsumer.taskRunCompleted(
58+
payload.backgroundWorkerId,
59+
payload.data.completion
60+
);
61+
break;
62+
}
63+
case "TASK_HEARTBEAT": {
64+
await this._environmentConsumer.taskHeartbeat(
65+
payload.backgroundWorkerId,
66+
payload.data.id
67+
);
68+
break;
69+
}
70+
}
71+
},
72+
},
73+
});
74+
}
75+
76+
async initialize() {
77+
this._sender.send("SERVER_READY", { id: this.id });
78+
}
79+
80+
async #handleMessage(ev: MessageEvent) {
81+
const data = JSON.parse(ev.data.toString());
82+
83+
await this._messageHandler.handleMessage(data);
84+
}
85+
86+
async #handleClose(ev: CloseEvent) {
87+
await this._environmentConsumer.stop();
88+
89+
this.onClose.post(ev);
90+
}
91+
92+
async #handleError(ev: ErrorEvent) {
93+
logger.error("Websocket error", { ev });
94+
}
95+
}

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@ export type CreatableEventEnvironmentType = CreatableEvent["environmentType"];
2626
export type TraceAttributes = Partial<
2727
Pick<
2828
CreatableEvent,
29-
"attemptId" | "isError" | "runId" | "output" | "metadata" | "properties" | "style"
29+
| "attemptId"
30+
| "isError"
31+
| "runId"
32+
| "output"
33+
| "metadata"
34+
| "properties"
35+
| "style"
36+
| "queueId"
37+
| "queueName"
3038
>
3139
>;
3240

@@ -167,6 +175,8 @@ export class EventRepository {
167175
projectRef: options.environment.project.externalRef,
168176
runId: options.attributes.runId,
169177
taskSlug: options.taskSlug,
178+
queueId: options.attributes.queueId,
179+
queueName: options.attributes.queueName,
170180
properties: {
171181
...style,
172182
...(flattenAttributes(metadata, SemanticInternalAttributes.METADATA) as Record<

0 commit comments

Comments
 (0)