Skip to content

Commit 8fc8f57

Browse files
authored
v2: MarQS powered job executions (triggerdotdev#1149)
* WIP * Allow marqsv2 and v2 graphile to run in parallel * Fix missing GraphileLogger import * Fixed heartbeat after rebase * Replace postgres based run counters with redis ones with a backfill * Add back in the graphile logger * Remove duplicate visibility timeout calls * Clamp simple weighted strategy to max of 5
1 parent 9ebd91c commit 8fc8f57

29 files changed

+1203
-733
lines changed

apps/webapp/app/env.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,17 @@ const EnvironmentSchema = z.object({
174174
MARQS_DISABLE_REBALANCING: z.coerce.boolean().default(false),
175175

176176
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
177+
V2_MARQS_ENABLED: z.string().default("0"),
178+
V2_MARQS_CONSUMER_POOL_ENABLED: z.string().default("0"),
179+
V2_MARQS_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
180+
V2_MARQS_CONSUMER_POLL_INTERVAL_MS: z.coerce.number().int().default(1000),
181+
V2_MARQS_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
182+
V2_MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
183+
.number()
184+
.int()
185+
.default(60 * 1000 * 15),
186+
V2_MARQS_DEFAULT_ENV_CONCURRENCY: z.coerce.number().int().default(100),
187+
V2_MARQS_VERBOSE: z.string().default("0"),
177188
});
178189

179190
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,16 +283,18 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
283283

284284
this.#shuttingDown = true;
285285

286+
this.#logDebug(
287+
`Received ${signal}, shutting down zodWorker with timeout ${this.#shutdownTimeoutInMs}ms`
288+
);
289+
286290
if (this.#shutdownTimeoutInMs) {
287291
setTimeout(() => {
288-
this.#logDebug("Shutdown timeout reached, exiting process");
292+
this.#logDebug(`Shutdown timeout of ${this.#shutdownTimeoutInMs} reached, exiting process`);
289293

290294
process.exit(0);
291295
}, this.#shutdownTimeoutInMs);
292296
}
293297

294-
this.#logDebug(`Received ${signal}, shutting down zodWorker...`);
295-
296298
this.stop().finally(() => {
297299
this.#logDebug("zodWorker stopped");
298300
});
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import Redis, { RedisOptions } from "ioredis";
2+
import {
3+
$transaction,
4+
Prisma,
5+
PrismaClientOrTransaction,
6+
PrismaTransactionOptions,
7+
prisma,
8+
} from "~/db.server";
9+
import { env } from "~/env.server";
10+
import { singleton } from "~/utils/singleton";
11+
12+
export type AutoIncrementCounterOptions = {
13+
redis: RedisOptions;
14+
};
15+
16+
export class AutoIncrementCounter {
17+
private _redis: Redis;
18+
19+
constructor(private options: AutoIncrementCounterOptions) {
20+
this._redis = new Redis(options.redis);
21+
}
22+
23+
async incrementInTransaction<T>(
24+
key: string,
25+
callback: (num: number, tx: PrismaClientOrTransaction) => Promise<T>,
26+
backfiller?: (key: string, db: PrismaClientOrTransaction) => Promise<number | undefined>,
27+
client: PrismaClientOrTransaction = prisma,
28+
transactionOptions?: PrismaTransactionOptions
29+
): Promise<T | undefined> {
30+
let performedIncrement = false;
31+
let performedBackfill = false;
32+
33+
try {
34+
return await $transaction(
35+
client,
36+
async (tx) => {
37+
let newNumber = await this.#increment(key);
38+
39+
performedIncrement = true;
40+
41+
if (newNumber === 1 && backfiller) {
42+
const backfilledNumber = await backfiller(key, tx);
43+
44+
if (backfilledNumber && backfilledNumber > 1) {
45+
newNumber = backfilledNumber + 1;
46+
await this._redis.set(key, newNumber);
47+
performedBackfill = true;
48+
}
49+
}
50+
51+
return await callback(newNumber, tx);
52+
},
53+
transactionOptions
54+
);
55+
} catch (e) {
56+
if (
57+
e instanceof Prisma.PrismaClientKnownRequestError ||
58+
e instanceof Prisma.PrismaClientUnknownRequestError ||
59+
e instanceof Prisma.PrismaClientValidationError
60+
) {
61+
if (performedIncrement && !performedBackfill) {
62+
await this._redis.decr(key);
63+
}
64+
}
65+
66+
throw e;
67+
}
68+
}
69+
70+
async #increment(key: string): Promise<number> {
71+
return await this._redis.incr(key);
72+
}
73+
}
74+
75+
export const autoIncrementCounter = singleton("auto-increment-counter", getAutoIncrementCounter);
76+
77+
function getAutoIncrementCounter() {
78+
if (!env.REDIS_HOST || !env.REDIS_PORT) {
79+
throw new Error(
80+
"Could not initialize auto-increment counter because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set. "
81+
);
82+
}
83+
84+
return new AutoIncrementCounter({
85+
redis: {
86+
keyPrefix: "auto-counter:",
87+
port: env.REDIS_PORT,
88+
host: env.REDIS_HOST,
89+
username: env.REDIS_USERNAME,
90+
password: env.REDIS_PASSWORD,
91+
enableAutoPipelining: true,
92+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
93+
},
94+
});
95+
}

apps/webapp/app/services/jobs/registerJob.server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import {
66
assertExhaustive,
77
} from "@trigger.dev/core";
88
import type { Endpoint, Integration, Job, JobIntegration, JobVersion } from "@trigger.dev/database";
9-
import { DEFAULT_MAX_CONCURRENT_RUNS } from "~/consts";
109
import type { PrismaClient } from "~/db.server";
1110
import { prisma } from "~/db.server";
1211
import { ExtendedEndpoint, findEndpoint } from "~/models/endpoint.server";
1312
import type { RuntimeEnvironment } from "~/models/runtimeEnvironment.server";
13+
import { putConcurrencyLimitGroup, putJobConcurrencyLimit } from "~/v3/marqs/v2.server";
1414
import type { AuthenticatedEnvironment } from "../apiAuth.server";
1515
import { logger } from "../logger.server";
1616
import { RegisterScheduleSourceService } from "../schedules/registerScheduleSource.server";
@@ -175,13 +175,17 @@ export class RegisterJobService {
175175

176176
try {
177177
if (jobVersion.concurrencyLimitGroup) {
178-
// Upsert the maxSize for the concurrency limit group
178+
// Upsert the maxSize for the concurrency limit group (marqs v2)
179+
await putConcurrencyLimitGroup(jobVersion.concurrencyLimitGroup, environment);
180+
181+
// Upsert the maxSize for the concurrency limit group (legacy)
179182
await executionRateLimiter?.putConcurrencyLimitGroup(
180183
jobVersion.concurrencyLimitGroup,
181184
environment
182185
);
183186
}
184187

188+
await putJobConcurrencyLimit(job, jobVersion, environment);
185189
await executionRateLimiter?.putJobVersionConcurrencyLimit(jobVersion, environment);
186190
} catch (error) {
187191
logger.error("Error setting concurrency limit", {

apps/webapp/app/services/runs/performRunExecutionV3.server.ts

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,7 @@ import {
1616
supportsFeature,
1717
} from "@trigger.dev/core";
1818
import { BloomFilter } from "@trigger.dev/core-backend";
19-
import {
20-
ConcurrencyLimitGroup,
21-
JobRun,
22-
JobVersion,
23-
RuntimeEnvironment,
24-
} from "@trigger.dev/database";
19+
import { ConcurrencyLimitGroup, Job, JobRun, JobVersion } from "@trigger.dev/database";
2520
import { generateErrorMessage } from "zod-error";
2621
import { eventRecordToApiJson } from "~/api.server";
2722
import {
@@ -31,22 +26,24 @@ import {
3126
RUN_CHUNK_EXECUTION_BUFFER,
3227
} from "~/consts";
3328
import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server";
29+
import { env } from "~/env.server";
3430
import { detectResponseIsTimeout } from "~/models/endpoint.server";
3531
import { isRunCompleted } from "~/models/jobRun.server";
3632
import { resolveRunConnections } from "~/models/runConnection.server";
3733
import { prepareTasksForCaching, prepareTasksForCachingLegacy } from "~/models/task.server";
3834
import { CompleteRunTaskService } from "~/routes/api.v1.runs.$runId.tasks.$id.complete/CompleteRunTaskService.server";
3935
import { formatError } from "~/utils/formatErrors.server";
4036
import { safeJsonZodParse } from "~/utils/json";
37+
import { marqsv2 } from "~/v3/marqs/v2.server";
38+
import { AuthenticatedEnvironment } from "../apiAuth.server";
4139
import { EndpointApi } from "../endpointApi.server";
4240
import { createExecutionEvent } from "../executions/createExecutionEvent.server";
4341
import { logger } from "../logger.server";
42+
import { executionRateLimiter } from "../runExecutionRateLimiter.server";
4443
import { ResumeTaskService } from "../tasks/resumeTask.server";
4544
import { executionWorker, workerQueue } from "../worker.server";
4645
import { forceYieldCoordinator } from "./forceYieldCoordinator.server";
4746
import { ResumeRunService } from "./resumeRun.server";
48-
import { executionRateLimiter } from "../runExecutionRateLimiter.server";
49-
import { env } from "~/env.server";
5047

5148
type FoundRun = NonNullable<Awaited<ReturnType<typeof findRun>>>;
5249
type FoundTask = FoundRun["tasks"][number];
@@ -96,9 +93,10 @@ export class PerformRunExecutionV3Service {
9693
static async enqueue(
9794
run: JobRun & {
9895
version: JobVersion & {
99-
environment: RuntimeEnvironment;
96+
environment: AuthenticatedEnvironment;
10097
concurrencyLimitGroup?: ConcurrencyLimitGroup | null;
10198
};
99+
job: Job;
102100
},
103101
priority: RunExecutionPriority,
104102
tx: PrismaClientOrTransaction,
@@ -107,27 +105,49 @@ export class PerformRunExecutionV3Service {
107105
skipRetrying?: boolean;
108106
} = {}
109107
) {
110-
return await executionWorker.enqueue(
111-
"performRunExecutionV3",
112-
{
113-
id: run.id,
114-
reason: "EXECUTE_JOB",
115-
},
116-
{
117-
tx,
118-
runAt: options.runAt,
119-
jobKey: `job_run:EXECUTE_JOB:${run.id}`,
120-
maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined,
121-
flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [],
122-
priority: priority === "initial" ? 0 : -1,
108+
if (marqsv2 && run.version.environment.organization.v2MarqsEnabled) {
109+
let queue = `job/${run.job.slug}`;
110+
111+
if (run.version.concurrencyLimitGroup) {
112+
queue = `group/${run.version.concurrencyLimitGroup.name}`;
123113
}
124-
);
114+
115+
const runAt =
116+
priority === "initial" ? options.runAt ?? new Date() : run.startedAt ?? run.createdAt;
117+
118+
await marqsv2.enqueueMessage(
119+
run.version.environment,
120+
queue,
121+
run.id,
122+
{ runId: run.id, attempt: 1 },
123+
undefined,
124+
runAt.getTime()
125+
);
126+
} else {
127+
return await executionWorker.enqueue(
128+
"performRunExecutionV3",
129+
{
130+
id: run.id,
131+
reason: "EXECUTE_JOB",
132+
},
133+
{
134+
tx,
135+
runAt: options.runAt,
136+
jobKey: `job_run:EXECUTE_JOB:${run.id}`,
137+
maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined,
138+
flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [],
139+
priority: priority === "initial" ? 0 : -1,
140+
}
141+
);
142+
}
125143
}
126144

127145
static async dequeue(run: JobRun, tx: PrismaClientOrTransaction) {
128146
await executionWorker.dequeue(`job_run:EXECUTE_JOB:${run.id}`, {
129147
tx,
130148
});
149+
150+
await marqsv2?.acknowledgeMessage(run.id);
131151
}
132152

133153
async #executeJob(run: FoundRun, input: PerformRunExecutionV3Input, driftInMs: number = 0) {
@@ -254,6 +274,10 @@ export class PerformRunExecutionV3Service {
254274

255275
forceYieldCoordinator.deregisterRun(run.id);
256276

277+
if (marqsv2 && run.organization.v2MarqsEnabled) {
278+
await marqsv2.acknowledgeMessage(run.id);
279+
}
280+
257281
//if the run has been canceled while it's being executed, we shouldn't do anything more
258282
const updatedRun = await this.#prismaClient.jobRun.findUnique({
259283
select: {

apps/webapp/app/services/runs/resumeRun.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,15 @@ async function findRun(prisma: PrismaClientOrTransaction, id: string) {
147147
return await prisma.jobRun.findUnique({
148148
where: { id },
149149
include: {
150+
job: true,
150151
version: {
151152
include: {
152-
environment: true,
153+
environment: {
154+
include: {
155+
organization: true,
156+
project: true,
157+
},
158+
},
153159
concurrencyLimitGroup: true,
154160
},
155161
},

apps/webapp/app/services/runs/startRun.server.ts

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import {
44
type IntegrationConnection,
55
} from "@trigger.dev/database";
66
import type { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
7-
import { $transaction, prisma } from "~/db.server";
7+
import { prisma } from "~/db.server";
8+
import { autoIncrementCounter } from "../autoIncrementCounter.server";
9+
import { logger } from "../logger.server";
810
import { workerQueue } from "../worker.server";
911
import { ResumeRunService } from "./resumeRun.server";
10-
import { createHash } from "node:crypto";
11-
import { logger } from "../logger.server";
1212

1313
type FoundRun = NonNullable<Awaited<ReturnType<typeof findRun>>>;
1414
type RunConnectionsByKey = Awaited<ReturnType<typeof createRunConnections>>;
@@ -67,22 +67,14 @@ export class StartRunService {
6767
: undefined
6868
)
6969
.filter(Boolean);
70-
const lockId = jobIdToLockId(run.jobId);
71-
72-
await $transaction(
73-
this.#prismaClient,
74-
async (tx) => {
75-
const counter = await tx.jobCounter.upsert({
76-
where: { jobId: run.jobId },
77-
update: { lastNumber: { increment: 1 } },
78-
create: { jobId: run.jobId, lastNumber: 1 },
79-
select: { lastNumber: true },
80-
});
8170

82-
const updatedRun = await this.#prismaClient.jobRun.update({
71+
await autoIncrementCounter.incrementInTransaction(
72+
`v2-run:${run.jobId}`,
73+
async (num, tx) => {
74+
const updatedRun = await tx.jobRun.update({
8375
where: { id },
8476
data: {
85-
number: counter.lastNumber,
77+
number: num,
8678
status: "QUEUED",
8779
queuedAt: new Date(),
8880
runConnections: {
@@ -93,7 +85,16 @@ export class StartRunService {
9385

9486
await ResumeRunService.enqueue(updatedRun, tx);
9587
},
96-
{ timeout: 60000 }
88+
async (_, tx) => {
89+
const counter = await tx.jobCounter.findUnique({
90+
where: { jobId: run.jobId },
91+
select: { lastNumber: true },
92+
});
93+
94+
return counter?.lastNumber;
95+
},
96+
this.#prismaClient,
97+
{ timeout: 10_000 }
9798
);
9899
}
99100

@@ -242,8 +243,3 @@ async function createRunConnections(tx: PrismaClientOrTransaction, run: FoundRun
242243
function hasMissingConnections(runConnectionsByKey: RunConnectionsByKey) {
243244
return Object.values(runConnectionsByKey).some((connection) => connection.result === "missing");
244245
}
245-
246-
function jobIdToLockId(jobId: string): number {
247-
// Convert jobId to a unique lock identifier
248-
return parseInt(createHash("sha256").update(jobId).digest("hex").slice(0, 8), 16);
249-
}

apps/webapp/app/services/tasks/resumeTask.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ export class ResumeTaskService {
5151
logger.debug("ResumeTaskService.call resuming run execution", {
5252
parent: task.parent,
5353
taskId: task.id,
54+
runId: task.run.id,
55+
org: task.run.organizationId,
56+
environment: task.run.environmentId,
5457
});
5558

5659
if (task.parent && task.parent.childExecutionMode === "PARALLEL") {

0 commit comments

Comments
 (0)