From 358e466a37876cb3af583eb10fcbce4984cafb19 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 5 Jun 2024 17:32:01 +0100 Subject: [PATCH 1/8] WIP --- apps/webapp/analyze_marqs.mjs | 147 ++++++++ apps/webapp/app/env.server.ts | 10 + apps/webapp/app/platform/zodWorker.server.ts | 14 +- .../app/services/jobs/registerJob.server.ts | 12 +- .../runs/performRunExecutionV3.server.ts | 48 +-- .../app/services/runs/resumeRun.server.ts | 8 +- .../app/services/tasks/resumeTask.server.ts | 3 + apps/webapp/app/services/worker.server.ts | 13 + apps/webapp/app/v3/friendlyIdentifiers.ts | 4 +- apps/webapp/app/v3/marqs/index.server.ts | 201 +++++++---- .../app/v3/marqs/marqsKeyProducer.server.ts | 4 + .../app/v3/marqs/priorityStrategy.server.ts | 156 --------- .../app/v3/marqs/requeueV2Message.server.ts | 27 ++ .../v3/marqs/sharedQueueConsumer.server.ts | 5 +- .../simpleWeightedPriorityStrategy.server.ts | 127 +++++++ apps/webapp/app/v3/marqs/types.ts | 15 +- apps/webapp/app/v3/marqs/v2.server.ts | 314 +++++++++++++++++ .../v3/marqs/v3VisibilityTimeout.server.ts | 12 + apps/webapp/test/marqs.test.ts | 316 ------------------ perf/src/index.ts | 29 +- perf/src/trigger.ts | 52 +++ references/job-catalog/src/stressTest.ts | 3 + 22 files changed, 935 insertions(+), 585 deletions(-) create mode 100755 apps/webapp/analyze_marqs.mjs delete mode 100644 apps/webapp/app/v3/marqs/priorityStrategy.server.ts create mode 100644 apps/webapp/app/v3/marqs/requeueV2Message.server.ts create mode 100644 apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts create mode 100644 apps/webapp/app/v3/marqs/v2.server.ts create mode 100644 apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts delete mode 100644 apps/webapp/test/marqs.test.ts diff --git a/apps/webapp/analyze_marqs.mjs b/apps/webapp/analyze_marqs.mjs new file mode 100755 index 00000000000..38a2bebac1a --- /dev/null +++ b/apps/webapp/analyze_marqs.mjs @@ -0,0 +1,147 @@ +#!/usr/bin/env node + +const filename = process.argv[2]; + +if (!filename) { + console.error("Usage: analyze_marqs.mjs "); + process.exit(1); +} + +import fs from "fs/promises"; +import util from "util"; + +(async () => { + try { + const input = await fs.readFile(filename, "utf-8"); + + await processInput(input); + } catch (err) { + console.error(`Error reading file: ${err}`); + process.exit(1); + } +})(); + +// input is jsonl format, we want to split by line and then JSON parse each line +async function processInput(input) { + const rows = []; + const lines = input.split("\n"); + for (const line of lines) { + if (!line) { + continue; + } + + const row = JSON.parse(line); + + // process each row + rows.push(row); + } + + const queueChoiceCounts = {}; + const queueMaxAges = {}; + const queueMaxSizes = {}; + const nextRangeOffsetCounts = {}; + const consumerStats = {}; + const rowsByConsumer = {}; + + console.log(`Processed ${rows.length} rows`); + + // console.log(util.inspect(rows[0], { depth: 20 })); + + for (const row of rows) { + const queueChoice = row.queueChoice; + + if (queueChoice) { + if (!queueChoiceCounts[queueChoice]) { + queueChoiceCounts[queueChoice] = 0; + } + queueChoiceCounts[queueChoice]++; + } + } + + for (const row of rows) { + rowsByConsumer[row.consumerId] = rowsByConsumer[row.consumerId] || []; + rowsByConsumer[row.consumerId].push(row); + + const queueChoice = row.queueChoice; + + if (!consumerStats[row.consumerId]) { + consumerStats[row.consumerId] = { + queueChoiceCounts: {}, + totalQueueChoices: 0, + noQueueChoiceCount: 0, + }; + } + + if (queueChoice) { + const queueData = row.queuesWithScores.find((queue) => queue.queue === queueChoice); + + console.log( + `[${row.timestamp}] -> ${queueChoice} [age:${queueData.age}] [size:${queueData.size}] [nextRange.offset=${row.nextRange.offset}] [queuesWithScores=${row.queuesWithScores.length}] [${row.consumerId}]` + ); + + if (!queueMaxAges[queueChoice] || queueData.age > queueMaxAges[queueChoice]) { + queueMaxAges[queueChoice] = queueData.age; + } + + if (!queueMaxSizes[queueChoice] || queueData.size > queueMaxSizes[queueChoice]) { + queueMaxSizes[queueChoice] = queueData.size; + } + + if (!consumerStats[row.consumerId].queueChoiceCounts[queueChoice]) { + consumerStats[row.consumerId].queueChoiceCounts[queueChoice] = 0; + } + + consumerStats[row.consumerId].queueChoiceCounts[queueChoice]++; + consumerStats[row.consumerId].totalQueueChoices++; + } else { + console.log( + `[${row.timestamp}] -> No queue choice [nextRange.offset=${row.nextRange.offset}] [queuesWithScores=${row.queuesWithScores.length}] [${row.consumerId}]` + ); + + consumerStats[row.consumerId].noQueueChoiceCount++; + } + + if (!nextRangeOffsetCounts[row.nextRange.offset]) { + nextRangeOffsetCounts[row.nextRange.offset] = 0; + } + + nextRangeOffsetCounts[row.nextRange.offset]++; + } + + console.log("Queue choice counts:"); + console.log(queueChoiceCounts); + + console.log("Queue max ages:"); + console.log(queueMaxAges); + + console.log("Queue max sizes:"); + console.log(queueMaxSizes); + + console.log("Next range offset counts:"); + console.log(nextRangeOffsetCounts); + + for (const consumerId in consumerStats) { + console.log(`Consumer ${consumerId}:`); + console.log(consumerStats[consumerId]); + } + + for (const consumerId in rowsByConsumer) { + console.log(`\n## Consumer ${consumerId}:`); + + for (const row of rowsByConsumer[consumerId]) { + const queueChoice = row.queueChoice; + + if (queueChoice) { + const queueData = row.queuesWithScores.find((queue) => queue.queue === queueChoice); + + console.log( + `[${row.timestamp}] -> ${queueChoice} [age:${queueData.age}] [size:${queueData.size}] [nextRange.offset=${row.nextRange.offset}] [queuesWithScores=${row.queuesWithScores.length}] [${row.consumerId}]` + ); + } else { + console.log( + `[${row.timestamp}] -> No queue choice [nextRange.offset=${row.nextRange.offset}] [queuesWithScores=${row.queuesWithScores.length}] [${row.consumerId}]` + ); + } + } + } +} diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index a1a44b60f19..5f7697bbae9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -174,6 +174,16 @@ const EnvironmentSchema = z.object({ MARQS_DISABLE_REBALANCING: z.coerce.boolean().default(false), VERBOSE_GRAPHILE_LOGGING: z.string().default("false"), + V2_MARQS_CONSUMER_POOL_ENABLED: z.string().default("0"), + V2_MARQS_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), + V2_MARQS_CONSUMER_POLL_INTERVAL_MS: z.coerce.number().int().default(1000), + V2_MARQS_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36), + V2_MARQS_VISIBILITY_TIMEOUT_MS: z.coerce + .number() + .int() + .default(60 * 1000 * 15), + V2_MARQS_DEFAULT_ENV_CONCURRENCY: z.coerce.number().int().default(100), + V2_MARQS_VERBOSE: z.string().default("0"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index 80e26763d81..96e85007a83 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -28,6 +28,12 @@ import { env } from "~/env.server"; const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1"); +const graphileLogger = new GraphileLogger((scope) => { + return (level, message, meta) => { + logger.debug(`[graphile-worker][${level}] ${message}`, { scope, meta }); + }; +}); + export interface MessageCatalogSchema { [key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; } @@ -283,16 +289,18 @@ export class ZodWorker { this.#shuttingDown = true; + this.#logDebug( + `Received ${signal}, shutting down zodWorker with timeout ${this.#shutdownTimeoutInMs}ms` + ); + if (this.#shutdownTimeoutInMs) { setTimeout(() => { - this.#logDebug("Shutdown timeout reached, exiting process"); + this.#logDebug(`Shutdown timeout of ${this.#shutdownTimeoutInMs} reached, exiting process`); process.exit(0); }, this.#shutdownTimeoutInMs); } - this.#logDebug(`Received ${signal}, shutting down zodWorker...`); - this.stop().finally(() => { this.#logDebug("zodWorker stopped"); }); diff --git a/apps/webapp/app/services/jobs/registerJob.server.ts b/apps/webapp/app/services/jobs/registerJob.server.ts index 843b054e62c..25798583f08 100644 --- a/apps/webapp/app/services/jobs/registerJob.server.ts +++ b/apps/webapp/app/services/jobs/registerJob.server.ts @@ -6,15 +6,14 @@ import { assertExhaustive, } from "@trigger.dev/core"; import type { Endpoint, Integration, Job, JobIntegration, JobVersion } from "@trigger.dev/database"; -import { DEFAULT_MAX_CONCURRENT_RUNS } from "~/consts"; import type { PrismaClient } from "~/db.server"; import { prisma } from "~/db.server"; import { ExtendedEndpoint, findEndpoint } from "~/models/endpoint.server"; import type { RuntimeEnvironment } from "~/models/runtimeEnvironment.server"; +import { putConcurrencyLimitGroup, putJobConcurrencyLimit } from "~/v3/marqs/v2.server"; import type { AuthenticatedEnvironment } from "../apiAuth.server"; import { logger } from "../logger.server"; import { RegisterScheduleSourceService } from "../schedules/registerScheduleSource.server"; -import { executionRateLimiter } from "../runExecutionRateLimiter.server"; export class RegisterJobService { #prismaClient: PrismaClient; @@ -176,13 +175,10 @@ export class RegisterJobService { try { if (jobVersion.concurrencyLimitGroup) { // Upsert the maxSize for the concurrency limit group - await executionRateLimiter?.putConcurrencyLimitGroup( - jobVersion.concurrencyLimitGroup, - environment - ); + await putConcurrencyLimitGroup(jobVersion.concurrencyLimitGroup, environment); + } else if (typeof jobVersion.concurrencyLimit === "number") { + await putJobConcurrencyLimit(job, environment, jobVersion.concurrencyLimit); } - - await executionRateLimiter?.putJobVersionConcurrencyLimit(jobVersion, environment); } catch (error) { logger.error("Error setting concurrency limit", { error, diff --git a/apps/webapp/app/services/runs/performRunExecutionV3.server.ts b/apps/webapp/app/services/runs/performRunExecutionV3.server.ts index ae8c520f0cd..d5bdd9fa3cc 100644 --- a/apps/webapp/app/services/runs/performRunExecutionV3.server.ts +++ b/apps/webapp/app/services/runs/performRunExecutionV3.server.ts @@ -16,12 +16,7 @@ import { supportsFeature, } from "@trigger.dev/core"; import { BloomFilter } from "@trigger.dev/core-backend"; -import { - ConcurrencyLimitGroup, - JobRun, - JobVersion, - RuntimeEnvironment, -} from "@trigger.dev/database"; +import { ConcurrencyLimitGroup, Job, JobRun, JobVersion } from "@trigger.dev/database"; import { generateErrorMessage } from "zod-error"; import { eventRecordToApiJson } from "~/api.server"; import { @@ -38,6 +33,8 @@ import { prepareTasksForCaching, prepareTasksForCachingLegacy } from "~/models/t import { CompleteRunTaskService } from "~/routes/api.v1.runs.$runId.tasks.$id.complete/CompleteRunTaskService.server"; import { formatError } from "~/utils/formatErrors.server"; import { safeJsonZodParse } from "~/utils/json"; +import { marqsv2 } from "~/v3/marqs/v2.server"; +import { AuthenticatedEnvironment } from "../apiAuth.server"; import { EndpointApi } from "../endpointApi.server"; import { createExecutionEvent } from "../executions/createExecutionEvent.server"; import { logger } from "../logger.server"; @@ -45,8 +42,6 @@ import { ResumeTaskService } from "../tasks/resumeTask.server"; import { executionWorker, workerQueue } from "../worker.server"; import { forceYieldCoordinator } from "./forceYieldCoordinator.server"; import { ResumeRunService } from "./resumeRun.server"; -import { executionRateLimiter } from "../runExecutionRateLimiter.server"; -import { env } from "~/env.server"; type FoundRun = NonNullable>>; type FoundTask = FoundRun["tasks"][number]; @@ -96,9 +91,10 @@ export class PerformRunExecutionV3Service { static async enqueue( run: JobRun & { version: JobVersion & { - environment: RuntimeEnvironment; + environment: AuthenticatedEnvironment; concurrencyLimitGroup?: ConcurrencyLimitGroup | null; }; + job: Job; }, priority: RunExecutionPriority, tx: PrismaClientOrTransaction, @@ -107,24 +103,28 @@ export class PerformRunExecutionV3Service { skipRetrying?: boolean; } = {} ) { - return await executionWorker.enqueue( - "performRunExecutionV3", - { - id: run.id, - reason: "EXECUTE_JOB", - }, - { - tx, - runAt: options.runAt, - jobKey: `job_run:EXECUTE_JOB:${run.id}`, - maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined, - flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [], - priority: priority === "initial" ? 0 : -1, - } + let queue = `job/${run.job.slug}`; + + if (run.version.concurrencyLimitGroup) { + queue = `group/${run.version.concurrencyLimitGroup.name}`; + } + + const runAt = + priority === "initial" ? options.runAt ?? new Date() : run.startedAt ?? run.createdAt; + + await marqsv2.enqueueMessage( + run.version.environment, + queue, + run.id, + { runId: run.id, attempt: 1 }, + undefined, + runAt.getTime() ); } static async dequeue(run: JobRun, tx: PrismaClientOrTransaction) { + await marqsv2.acknowledgeMessage(run.id); + await executionWorker.dequeue(`job_run:EXECUTE_JOB:${run.id}`, { tx, }); @@ -254,6 +254,8 @@ export class PerformRunExecutionV3Service { forceYieldCoordinator.deregisterRun(run.id); + await marqsv2.acknowledgeMessage(run.id); + //if the run has been canceled while it's being executed, we shouldn't do anything more const updatedRun = await this.#prismaClient.jobRun.findUnique({ select: { diff --git a/apps/webapp/app/services/runs/resumeRun.server.ts b/apps/webapp/app/services/runs/resumeRun.server.ts index 6b99035ff2e..6f8a158fe5b 100644 --- a/apps/webapp/app/services/runs/resumeRun.server.ts +++ b/apps/webapp/app/services/runs/resumeRun.server.ts @@ -147,9 +147,15 @@ async function findRun(prisma: PrismaClientOrTransaction, id: string) { return await prisma.jobRun.findUnique({ where: { id }, include: { + job: true, version: { include: { - environment: true, + environment: { + include: { + organization: true, + project: true, + }, + }, concurrencyLimitGroup: true, }, }, diff --git a/apps/webapp/app/services/tasks/resumeTask.server.ts b/apps/webapp/app/services/tasks/resumeTask.server.ts index 03a59b47323..865c9248c77 100644 --- a/apps/webapp/app/services/tasks/resumeTask.server.ts +++ b/apps/webapp/app/services/tasks/resumeTask.server.ts @@ -51,6 +51,9 @@ export class ResumeTaskService { logger.debug("ResumeTaskService.call resuming run execution", { parent: task.parent, taskId: task.id, + runId: task.run.id, + org: task.run.organizationId, + environment: task.run.environmentId, }); if (task.parent && task.parent.childExecutionMode === "PARALLEL") { diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index b50336b5410..2230ff62ab4 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -44,6 +44,7 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se import { PerformTaskOperationService } from "./tasks/performTaskOperation.server"; import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server"; import { ResumeTaskService } from "./tasks/resumeTask.server"; +import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server"; const workerCatalog = { indexEndpoint: z.object({ @@ -164,6 +165,9 @@ const workerCatalog = { "v3.retryAttempt": z.object({ runId: z.string(), }), + "v2.requeueMessage": z.object({ + runId: z.string(), + }), }; const executionWorkerCatalog = { @@ -622,6 +626,15 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, + "v2.requeueMessage": { + priority: 0, + maxAttempts: 5, + handler: async (payload, job) => { + const service = new RequeueV2Message(); + + await service.call(payload.runId); + }, + }, }, }); } diff --git a/apps/webapp/app/v3/friendlyIdentifiers.ts b/apps/webapp/app/v3/friendlyIdentifiers.ts index ebe5e7900a2..1036edf2974 100644 --- a/apps/webapp/app/v3/friendlyIdentifiers.ts +++ b/apps/webapp/app/v3/friendlyIdentifiers.ts @@ -2,6 +2,6 @@ import { customAlphabet } from "nanoid"; const idGenerator = customAlphabet("123456789abcdefghijkmnopqrstuvwxyz", 21); -export function generateFriendlyId(prefix: string) { - return `${prefix}_${idGenerator()}`; +export function generateFriendlyId(prefix: string, size?: number) { + return `${prefix}_${idGenerator(size)}`; } diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index b0d201dd3d6..017b6503667 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1,4 +1,12 @@ -import { Span, SpanKind, SpanOptions, context, propagation, trace } from "@opentelemetry/api"; +import { + Span, + SpanKind, + SpanOptions, + Tracer, + context, + propagation, + trace, +} from "@opentelemetry/api"; import { SEMATTRS_MESSAGE_ID, SEMATTRS_MESSAGING_OPERATION, @@ -13,22 +21,20 @@ import { singleton } from "~/utils/singleton"; import { attributesFromAuthenticatedEnv } from "../tracer.server"; import { AsyncWorker } from "./asyncWorker.server"; import { MarQSShortKeyProducer } from "./marqsKeyProducer.server"; -import { SimpleWeightedChoiceStrategy } from "./priorityStrategy.server"; +import { SimpleWeightedChoiceStrategy } from "./simpleWeightedPriorityStrategy.server"; import { MarQSKeyProducer, MarQSQueuePriorityStrategy, MessagePayload, QueueCapacities, QueueRange, + VisibilityTimeoutStrategy, } from "./types"; -import { RequeueTaskRunService } from "../requeueTaskRun.server"; - -const tracer = trace.getTracer("marqs"); +import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server"; const KEY_PREFIX = "marqs:"; const constants = { - SHARED_QUEUE: "sharedQueue", MESSAGE_VISIBILITY_TIMEOUT_QUEUE: "msgVisibilityTimeout", } as const; @@ -40,6 +46,8 @@ const SemanticAttributes = { }; export type MarQSOptions = { + name: string; + tracer: Tracer; redis: RedisOptions; defaultEnvConcurrency: number; defaultOrgConcurrency: number; @@ -49,7 +57,9 @@ export type MarQSOptions = { keysProducer: MarQSKeyProducer; queuePriorityStrategy: MarQSQueuePriorityStrategy; envQueuePriorityStrategy: MarQSQueuePriorityStrategy; + visibilityTimeoutStrategy: VisibilityTimeoutStrategy; enableRebalancing?: boolean; + verbose?: boolean; }; /** @@ -74,6 +84,14 @@ export class MarQS { this.#registerCommands(); } + get name() { + return this.options.name; + } + + get tracer() { + return this.options.tracer; + } + public async updateQueueConcurrencyLimits( env: AuthenticatedEnvironment, queue: string, @@ -215,7 +233,8 @@ export class MarQS { const messageQueue = await this.#getRandomQueueFromParentQueue( parentQueue, this.options.envQueuePriorityStrategy, - (queue) => this.#calculateMessageQueueCapacities(queue) + (queue) => this.#calculateMessageQueueCapacities(queue), + env.id ); if (!messageQueue) { @@ -249,8 +268,9 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, }); } else { - logger.error("Failed to read message, undoing the dequeueing of the message", { + logger.error(`Failed to read message, undoing the dequeueing of the message`, { messageData, + service: this.name, }); await this.#callAcknowledgeMessage({ @@ -265,9 +285,14 @@ export class MarQS { }); } - await RequeueTaskRunService.enqueue( + await this.options.visibilityTimeoutStrategy.heartbeat( messageData.messageId, - new Date(Date.now() + this.visibilityTimeoutInMs) + this.visibilityTimeoutInMs + ); + + await this.options.visibilityTimeoutStrategy.heartbeat( + messageData.messageId, + this.visibilityTimeoutInMs ); return message; @@ -284,10 +309,11 @@ export class MarQS { } public async getSharedQueueDetails() { - const parentQueue = constants.SHARED_QUEUE; + const parentQueue = this.keys.sharedQueueKey(); - const { range, selectionId } = await this.queuePriorityStrategy.nextCandidateSelection( - parentQueue + const { range } = await this.queuePriorityStrategy.nextCandidateSelection( + parentQueue, + "getSharedQueueDetails" ); const queues = await this.#getChildQueuesWithScores(parentQueue, range); @@ -299,11 +325,12 @@ export class MarQS { const choice = this.queuePriorityStrategy.chooseQueue( queuesWithScores, parentQueue, - selectionId + "getSharedQueueDetails", + range ); return { - selectionId, + selectionId: "getSharedQueueDetails", queues, queuesWithScores, nextRange: range, @@ -315,17 +342,18 @@ export class MarQS { /** * Dequeue a message from the shared queue (this should be used in production environments) */ - public async dequeueMessageInSharedQueue() { + public async dequeueMessageInSharedQueue(consumerId: string) { return this.#trace( "dequeueMessageInSharedQueue", async (span) => { - const parentQueue = constants.SHARED_QUEUE; + const parentQueue = this.keys.sharedQueueKey(); // Read the parent queue for matching queues const messageQueue = await this.#getRandomQueueFromParentQueue( parentQueue, this.options.queuePriorityStrategy, - (queue) => this.#calculateMessageQueueCapacities(queue) + (queue) => this.#calculateMessageQueueCapacities(queue), + consumerId ); if (!messageQueue) { @@ -351,6 +379,11 @@ export class MarQS { const message = await this.readMessage(messageData.messageId); + await this.options.visibilityTimeoutStrategy.heartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + if (message) { span.setAttributes({ [SEMATTRS_MESSAGE_ID]: message.messageId, @@ -395,7 +428,7 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, }); - await RequeueTaskRunService.dequeue(messageId); + await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); await this.#callAcknowledgeMessage({ parentQueue: message.parentQueue, @@ -462,7 +495,7 @@ export class MarQS { return; } - await RequeueTaskRunService.dequeue(messageId); + await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); await this.#callAcknowledgeMessage({ parentQueue: oldMessage.parentQueue, @@ -493,27 +526,40 @@ export class MarQS { fn: (span: Span) => Promise, options?: SpanOptions & { sampleRate?: number } ): Promise { - return tracer.startActiveSpan(name, options ?? {}, async (span) => { - try { - return await fn(span); - } catch (e) { - if (e instanceof Error) { - span.recordException(e); - } else { - span.recordException(new Error(String(e))); + return this.tracer.startActiveSpan( + name, + { + ...options, + attributes: { + ...options?.attributes, + }, + }, + async (span) => { + try { + return await fn(span); + } catch (e) { + if (e instanceof Error) { + span.recordException(e); + } else { + span.recordException(new Error(String(e))); + } + + throw e; + } finally { + span.end(); } - - throw e; - } finally { - span.end(); } - }); + ); } /** * Negative acknowledge a message, which will requeue the message */ - public async nackMessage(messageId: string, retryAt: number = Date.now()) { + public async nackMessage( + messageId: string, + retryAt: number = Date.now(), + updates?: Record + ) { return this.#trace( "nackMessage", async (span) => { @@ -530,7 +576,11 @@ export class MarQS { [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, }); - await RequeueTaskRunService.dequeue(messageId); + if (updates) { + await this.replaceMessage(messageId, updates, retryAt, true); + } + + await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); await this.#callNackMessage({ messageKey: this.keys.messageKey(messageId), @@ -557,15 +607,7 @@ export class MarQS { // This should increment by the number of seconds, but with a max value of Date.now() + visibilityTimeoutInMs public async heartbeatMessage(messageId: string, seconds: number = 30) { - // We are still calling this for backwards compatibility, but we should be using the v3.requeueTaskRun job - await this.#callHeartbeatMessage({ - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, - messageId, - milliseconds: seconds * 1000, - maxVisibilityTimeout: Date.now() + this.visibilityTimeoutInMs, - }); - - await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + seconds * 1000)); + await this.options.visibilityTimeoutStrategy.heartbeat(messageId, seconds * 1000); } get visibilityTimeoutInMs() { @@ -585,9 +627,10 @@ export class MarQS { const message = MessagePayload.safeParse(JSON.parse(rawMessage)); if (!message.success) { - logger.error("Failed to parse message", { + logger.error(`[${this.name}] Failed to parse message`, { messageId, error: message.error, + service: this.name, }); return; @@ -609,13 +652,15 @@ export class MarQS { async #getRandomQueueFromParentQueue( parentQueue: string, queuePriorityStrategy: MarQSQueuePriorityStrategy, - calculateCapacities: (queue: string) => Promise + calculateCapacities: (queue: string) => Promise, + consumerId: string ) { return this.#trace( "getRandomQueueFromParentQueue", async (span) => { - const { range, selectionId } = await queuePriorityStrategy.nextCandidateSelection( - parentQueue + const { range } = await queuePriorityStrategy.nextCandidateSelection( + parentQueue, + consumerId ); const queues = await this.#getChildQueuesWithScores(parentQueue, range); @@ -626,7 +671,8 @@ export class MarQS { const choice = this.queuePriorityStrategy.chooseQueue( queuesWithScores, parentQueue, - selectionId + consumerId, + range ); span.setAttributes({ @@ -639,6 +685,28 @@ export class MarQS { span.setAttribute("nextRange.count", range.count); span.setAttribute("queueCount", queues.length); + if (this.options.verbose) { + if (typeof choice === "string") { + logger.debug(`[${this.name}] getRandomQueueFromParentQueue`, { + queues, + queuesWithScores, + nextRange: range, + queueCount: queues.length, + queueChoice: choice, + consumerId, + }); + } else { + logger.debug(`[${this.name}] getRandomQueueFromParentQueue`, { + queues, + queuesWithScores, + nextRange: range, + queueCount: queues.length, + noQueueChoice: true, + consumerId, + }); + } + } + if (typeof choice !== "string") { span.setAttribute("noQueueChoice", true); @@ -673,6 +741,7 @@ export class MarQS { queue: queue.value, capacities: await calculateCapacities(queue.value), age: now - queue.score, + size: await this.redis.zcard(queue.value), }; }) ); @@ -806,6 +875,7 @@ export class MarQS { pattern, component: "marqs", operation: "rebalanceParentQueues", + service: this.name, }); stream.on("data", async (keys) => { @@ -817,6 +887,7 @@ export class MarQS { component: "marqs", operation: "rebalanceParentQueues", parentQueues: uniqueKeys, + service: this.name, }); Promise.all( @@ -866,6 +937,7 @@ export class MarQS { childQueuesWithScores, component: "marqs", operation: "rebalanceParentQueues", + service: this.name, }); await Promise.all( @@ -894,6 +966,7 @@ export class MarQS { async #callEnqueueMessage(message: MessagePayload) { logger.debug("Calling enqueueMessage", { messagePayload: message, + service: this.name, }); return this.redis.enqueueMessage( @@ -949,6 +1022,7 @@ export class MarQS { logger.debug("Dequeue message result", { result, + service: this.name, }); if (result.length !== 2) { @@ -964,6 +1038,7 @@ export class MarQS { async #callReplaceMessage(message: MessagePayload) { logger.debug("Calling replaceMessage", { messagePayload: message, + service: this.name, }); return this.redis.replaceMessage( @@ -1000,6 +1075,7 @@ export class MarQS { orgConcurrencyKey, messageId, parentQueue, + service: this.name, }); return this.redis.acknowledgeMessage( @@ -1046,6 +1122,7 @@ export class MarQS { visibilityQueue, messageId, messageScore, + service: this.name, }); return this.redis.nackMessage( @@ -1063,28 +1140,6 @@ export class MarQS { ); } - /** - * @deprecated This is being replaced by the v3.requeueTaskRun graphile worker job - */ - #callHeartbeatMessage({ - visibilityQueue, - messageId, - milliseconds, - maxVisibilityTimeout, - }: { - visibilityQueue: string; - messageId: string; - milliseconds: number; - maxVisibilityTimeout: number; - }) { - return this.redis.heartbeatMessage( - visibilityQueue, - messageId, - String(milliseconds), - String(maxVisibilityTimeout) - ); - } - async #callCalculateMessageCapacities({ currentConcurrencyKey, currentEnvConcurrencyKey, @@ -1168,6 +1223,7 @@ export class MarQS { currentScore, rebalanceResult, operation: "rebalanceParentQueueChild", + service: this.name, }); } @@ -1603,7 +1659,10 @@ function getMarQSClient() { }; return new MarQS({ + name: "marqs", + tracer: trace.getTracer("marqs"), keysProducer: new MarQSShortKeyProducer(KEY_PREFIX), + visibilityTimeoutStrategy: new V3VisibilityTimeout(), queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 36 }), envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 12 }), workers: 1, diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts index e3a2601dd7a..d3fcfe21d31 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts @@ -58,6 +58,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { ].join(":"); } + return this.sharedQueueKey(); + } + + sharedQueueKey(): string { return constants.SHARED_QUEUE; } diff --git a/apps/webapp/app/v3/marqs/priorityStrategy.server.ts b/apps/webapp/app/v3/marqs/priorityStrategy.server.ts deleted file mode 100644 index cb488aa502d..00000000000 --- a/apps/webapp/app/v3/marqs/priorityStrategy.server.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { RedisOptions } from "ioredis"; -import { - MarQSQueuePriorityStrategy, - PriorityStrategyChoice, - QueueRange, - QueueWithScores, -} from "./types"; -import { nanoid } from "nanoid"; -import seedrandom from "seedrandom"; - -export type DynamicWeightedChoiceStrategyOptions = { - initialQueueSelectionSize: number; - redis: RedisOptions; -}; - -// This implementation of the priority strategy will "react" over time, giving more weight to queues that have been selected less frequently. -// It will also change the next candidate selection range based on if previous choices only had queues that were at capacity. -// Some other ideas: -// - Implement a "cooldown" period for queues that have been selected recently -// - Implement a "decay" for queues that have been selected recently -// -// The "memory" of this strategy is stored in Redis, to coordinate between multiple instances of the webapp (coming soon?) -export class DynamicWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { - constructor(private options: DynamicWeightedChoiceStrategyOptions) {} - - chooseQueue( - queues: QueueWithScores[], - parentQueue: string, - selectionId: string - ): PriorityStrategyChoice { - throw new Error("Method not implemented."); - } - - nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }> { - throw new Error("Method not implemented."); - } -} - -export type SimpleWeightedChoiceStrategyOptions = { - queueSelectionCount: number; - randomSeed?: string; -}; - -export class SimpleWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { - private _nextRangesByParentQueue: Map = new Map(); - private _randomGenerator = seedrandom(this.options.randomSeed); - - constructor(private options: SimpleWeightedChoiceStrategyOptions) {} - - private nextRangeForParentQueue(parentQueue: string): QueueRange { - return ( - this._nextRangesByParentQueue.get(parentQueue) ?? { - offset: 0, - count: this.options.queueSelectionCount, - } - ); - } - - chooseQueue( - queues: QueueWithScores[], - parentQueue: string, - selectionId: string - ): PriorityStrategyChoice { - const filteredQueues = filterQueuesAtCapacity(queues); - - if (queues.length === this.options.queueSelectionCount) { - const nextRangeForParentQueue = this.nextRangeForParentQueue(parentQueue); - const nextRange: QueueRange = nextRangeForParentQueue - ? { - offset: nextRangeForParentQueue.offset + this.options.queueSelectionCount, - count: this.options.queueSelectionCount, - } - : { offset: this.options.queueSelectionCount, count: this.options.queueSelectionCount }; - // If all queues are at capacity, and we were passed the max number of queues, then we will slide the window "to the right" - this._nextRangesByParentQueue.set(parentQueue, nextRange); - } else { - this._nextRangesByParentQueue.delete(parentQueue); - } - - if (filteredQueues.length === 0) { - return { abort: true }; - } - - const queueWeights = this.#calculateQueueWeights(filteredQueues); - - return weightedRandomChoice(queueWeights, this._randomGenerator()); - } - - async nextCandidateSelection( - parentQueue: string - ): Promise<{ range: QueueRange; selectionId: string }> { - return { range: this.nextRangeForParentQueue(parentQueue), selectionId: nanoid(24) }; - } - - // This function calculates the weight of each queue based on the age of the queue and the capacity of the queue, env, and org - // First, it normalizes the age, queue capacity, env capacity, and org capacity to a value between 0 and 1 based on the maximum value of each - // Then, it calculates the weight of each queue based on the following factors: - // - Age is 50% of the weight - // - Queue capacity is 30% of the weight - // - Env capacity is 10% of the weight - // - Org capacity is 10% of the weight - #calculateQueueWeights(queues: QueueWithScores[]) { - const maximumAge = Math.max(...queues.map((queue) => queue.age)); - const maximumQueueCapacity = Math.max( - ...queues.map((queue) => queue.capacities.queue.limit - queue.capacities.queue.current) - ); - const maximumEnvCapacity = Math.max( - ...queues.map((queue) => queue.capacities.env.limit - queue.capacities.env.current) - ); - const maximumOrgCapacity = Math.max( - ...queues.map((queue) => queue.capacities.org.limit - queue.capacities.org.current) - ); - - return queues.map(({ capacities, age, queue }) => { - const ageWeight = 0.5 * (age / maximumAge); - const queueWeight = - 0.3 * (1 - (capacities.queue.limit - capacities.queue.current) / maximumQueueCapacity); - const envWeight = - 0.1 * (1 - (capacities.env.limit - capacities.env.current) / maximumEnvCapacity); - const orgWeight = - 0.1 * (1 - (capacities.org.limit - capacities.org.current) / maximumOrgCapacity); - - return { - queue, - weight: ageWeight + queueWeight + envWeight + orgWeight, - }; - }); - } -} - -function filterQueuesAtCapacity(queues: QueueWithScores[]) { - return queues.filter( - (queue) => - queue.capacities.queue.current < queue.capacities.queue.limit && - queue.capacities.env.current < queue.capacities.env.limit && - queue.capacities.org.current < queue.capacities.org.limit - ); -} - -function weightedRandomChoice( - queues: Array<{ queue: string; weight: number }>, - randomNumber: number -) { - const totalWeight = queues.reduce((acc, queue) => acc + queue.weight, 0); - const randomNum = randomNumber * totalWeight; - let weightSum = 0; - - for (const queue of queues) { - weightSum += queue.weight; - if (randomNum <= weightSum) { - return queue.queue; - } - } - - return queues[queues.length - 1].queue; -} diff --git a/apps/webapp/app/v3/marqs/requeueV2Message.server.ts b/apps/webapp/app/v3/marqs/requeueV2Message.server.ts new file mode 100644 index 00000000000..3ab0cde52dc --- /dev/null +++ b/apps/webapp/app/v3/marqs/requeueV2Message.server.ts @@ -0,0 +1,27 @@ +import { logger } from "~/services/logger.server"; +import { marqs } from "~/v3/marqs/index.server"; + +import { BaseService } from "../services/baseService.server"; +import { PrismaClientOrTransaction } from "~/db.server"; +import { workerQueue } from "~/services/worker.server"; +import { marqsv2 } from "./v2.server"; + +export class RequeueV2Message extends BaseService { + public async call(runId: string) { + logger.debug("[RequeueV2Message] Requeueing task run", { runId }); + + marqsv2.nackMessage(runId); + } + + public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) { + return await workerQueue.enqueue( + "v2.requeueMessage", + { runId }, + { runAt, jobKey: `requeueV2Message:${runId}` } + ); + } + + public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) { + return await workerQueue.dequeue(`requeueV2Message:${runId}`, { tx }); + } +} diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 0f8f5f6c050..a9a82357123 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -90,6 +90,7 @@ export class SharedQueueConsumer { private _currentSpan: Span | undefined; private _endSpanInNextIteration = false; private _tasks = sharedQueueTasks; + private _id: string; constructor( private _sender: ZodMessageSender, @@ -101,6 +102,8 @@ export class SharedQueueConsumer { nextTickInterval: options.nextTickInterval ?? 1000, // 1 second interval: options.interval ?? 100, // 100ms }; + + this._id = generateFriendlyId("shared-queue", 6); } // This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it @@ -235,7 +238,7 @@ export class SharedQueueConsumer { // When the task run completes, ack the message // Using a heartbeat mechanism, if the client keeps responding with a heartbeat, we'll keep the message processing and increase the visibility timeout. - const message = await marqs?.dequeueMessageInSharedQueue(); + const message = await marqs?.dequeueMessageInSharedQueue(this._id); if (!message) { this.#doMoreWork(this._options.nextTickInterval); diff --git a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts new file mode 100644 index 00000000000..402f65f5b2b --- /dev/null +++ b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts @@ -0,0 +1,127 @@ +import { RedisOptions } from "ioredis"; +import { nanoid } from "nanoid"; +import { + MarQSQueuePriorityStrategy, + PriorityStrategyChoice, + QueueRange, + QueueWithScores, +} from "./types"; + +export type SimpleWeightedChoiceStrategyOptions = { + queueSelectionCount: number; + randomSeed?: string; + excludeEnvCapacity?: boolean; +}; + +export class SimpleWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { + private _nextRangesByParentQueue: Map = new Map(); + + constructor(private options: SimpleWeightedChoiceStrategyOptions) {} + + private nextRangeForParentQueue(parentQueue: string, consumerId: string): QueueRange { + return ( + this._nextRangesByParentQueue.get(`${consumerId}:${parentQueue}`) ?? { + offset: 0, + count: this.options.queueSelectionCount, + } + ); + } + + chooseQueue( + queues: QueueWithScores[], + parentQueue: string, + consumerId: string, + previousRange: QueueRange + ): PriorityStrategyChoice { + const filteredQueues = filterQueuesAtCapacity(queues); + + if (queues.length === this.options.queueSelectionCount) { + const nextRange: QueueRange = { + offset: previousRange.offset + this.options.queueSelectionCount, + count: this.options.queueSelectionCount, + }; + // If all queues are at capacity, and we were passed the max number of queues, then we will slide the window "to the right" + this._nextRangesByParentQueue.set(`${consumerId}:${parentQueue}`, nextRange); + } else { + this._nextRangesByParentQueue.delete(`${consumerId}:${parentQueue}`); + } + + if (filteredQueues.length === 0) { + return { abort: true }; + } + + const queueWeights = this.#calculateQueueWeights(filteredQueues); + + return weightedRandomChoice(queueWeights); + } + + async nextCandidateSelection( + parentQueue: string, + consumerId: string + ): Promise<{ range: QueueRange }> { + return { + range: this.nextRangeForParentQueue(parentQueue, consumerId), + }; + } + + #calculateQueueWeights(queues: QueueWithScores[]) { + const avgQueueSize = queues.reduce((acc, { size }) => acc + size, 0) / queues.length; + const avgMessageAge = queues.reduce((acc, { age }) => acc + age, 0) / queues.length; + + return queues.map(({ capacities, age, queue, size }) => { + let totalWeight = 1; + + if (size > avgQueueSize) { + totalWeight += Math.min(size / avgQueueSize, 2); + } + + if (age > avgMessageAge) { + totalWeight += Math.min(age / avgMessageAge); + } + + return { + queue, + totalWeight: age, + }; + }); + } +} + +function filterQueuesAtCapacity(queues: QueueWithScores[]) { + return queues.filter( + (queue) => + queue.capacities.queue.current < queue.capacities.queue.limit && + queue.capacities.env.current < queue.capacities.env.limit && + queue.capacities.org.current < queue.capacities.org.limit + ); +} + +function weightedRandomChoice(queues: Array<{ queue: string; totalWeight: number }>) { + const totalWeight = queues.reduce((acc, queue) => acc + queue.totalWeight, 0); + let randomNum = Math.random() * totalWeight; + + for (const queue of queues) { + if (randomNum < queue.totalWeight) { + return queue.queue; + } + + randomNum -= queue.totalWeight; + } + + // If we get here, we should just return a random queue + return queues[Math.floor(Math.random() * queues.length)].queue; +} + +export class NoopWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { + chooseQueue( + queues: QueueWithScores[], + parentQueue: string, + selectionId: string + ): PriorityStrategyChoice { + return { abort: true }; + } + + nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }> { + return Promise.resolve({ range: { offset: 0, count: 0 }, selectionId: nanoid(24) }); + } +} diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 68d22cf28e6..e8f03cb7c9b 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -16,6 +16,7 @@ export type QueueWithScores = { queue: string; capacities: QueueCapacities; age: number; + size: number; }; export type QueueRange = { offset: number; count: number }; @@ -26,6 +27,7 @@ export interface MarQSKeyProducer { orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string; queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string; envSharedQueueKey(env: AuthenticatedEnvironment): string; + sharedQueueKey(): string; sharedQueueScanPattern(): string; concurrencyLimitKeyFromQueue(queue: string): string; currentConcurrencyKeyFromQueue(queue: string): string; @@ -52,14 +54,15 @@ export interface MarQSQueuePriorityStrategy { * * @param queues * @param parentQueue - * @param selectionId + * @param consumerId * * @returns The queue to process the message from, or an object with `abort: true` if no queue is available */ chooseQueue( queues: Array, parentQueue: string, - selectionId: string + consumerId: string, + previousRange: QueueRange ): PriorityStrategyChoice; /** @@ -68,10 +71,11 @@ export interface MarQSQueuePriorityStrategy { * The `selectionId` is used to identify the selection and should be passed to chooseQueue * * @param parentQueue The parent queue that holds the candidate queues + * @param consumerId The consumerId that is making the request * * @returns The scores and the selectionId for the next candidate selection */ - nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }>; + nextCandidateSelection(parentQueue: string, consumerId: string): Promise<{ range: QueueRange }>; } export const MessagePayload = z.object({ @@ -85,3 +89,8 @@ export const MessagePayload = z.object({ }); export type MessagePayload = z.infer; + +export interface VisibilityTimeoutStrategy { + heartbeat(messageId: string, timeoutInMs: number): Promise; + cancelHeartbeat(messageId: string): Promise; +} diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts new file mode 100644 index 00000000000..9a9d11e8405 --- /dev/null +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -0,0 +1,314 @@ +import { trace } from "@opentelemetry/api"; +import { RetryOptions, calculateNextRetryDelay } from "@trigger.dev/core/v3"; +import { ConcurrencyLimitGroup, Job } from "@trigger.dev/database"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; +import { PerformRunExecutionV3Service } from "~/services/runs/performRunExecutionV3.server"; +import { singleton } from "~/utils/singleton"; +import { generateFriendlyId } from "../friendlyIdentifiers"; +import { MarQS } from "./index.server"; +import { MarQSShortKeyProducer } from "./marqsKeyProducer.server"; +import { RequeueV2Message } from "./requeueV2Message.server"; +import { + NoopWeightedChoiceStrategy, + SimpleWeightedChoiceStrategy, +} from "./simpleWeightedPriorityStrategy.server"; +import { VisibilityTimeoutStrategy } from "./types"; + +const KEY_PREFIX = "marqsv2:"; +const SHARED_QUEUE_NAME = "sharedQueue"; + +export class V2VisibilityTimeout implements VisibilityTimeoutStrategy { + async heartbeat(messageId: string, timeoutInMs: number): Promise { + RequeueV2Message.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + } + + async cancelHeartbeat(messageId: string): Promise { + RequeueV2Message.dequeue(messageId); + } +} + +export class MarQSV2KeyProducer extends MarQSShortKeyProducer { + constructor(prefix: string) { + super(prefix); + } + + envSharedQueueKey(env: AuthenticatedEnvironment) { + return SHARED_QUEUE_NAME; + } + + sharedQueueKey(): string { + return SHARED_QUEUE_NAME; + } +} + +export const marqsv2 = singleton("marqsv2", getMarQSClient); + +function getMarQSClient() { + if (!env.REDIS_HOST || !env.REDIS_PORT) { + throw new Error( + "Could not initialize marqsv2 because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set. Trigger.dev v2 will not work without this." + ); + } + + const redisOptions = { + keyPrefix: KEY_PREFIX, + port: env.REDIS_PORT, + host: env.REDIS_HOST, + username: env.REDIS_USERNAME, + password: env.REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + return new MarQS({ + verbose: env.V2_MARQS_VERBOSE === "1", + name: "marqsv2", + tracer: trace.getTracer("marqsv2"), + visibilityTimeoutStrategy: new V2VisibilityTimeout(), + keysProducer: new MarQSV2KeyProducer(KEY_PREFIX), + queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ + queueSelectionCount: env.V2_MARQS_QUEUE_SELECTION_COUNT, + }), + envQueuePriorityStrategy: new NoopWeightedChoiceStrategy(), // We don't use this in v2, since all queues go through the shared queue + workers: 1, + redis: redisOptions, + defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, // this is so we aren't limited by the environment concurrency + defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, + visibilityTimeoutInMs: env.V2_MARQS_VISIBILITY_TIMEOUT_MS, // 15 minutes + enableRebalancing: !env.MARQS_DISABLE_REBALANCING, + }); +} + +export type V2QueueConsumerOptions = { + pollInterval?: number; + retryOptions?: RetryOptions; +}; + +const MessageBody = z.object({ + version: z.literal("v1").default("v1"), + runId: z.string(), + attempt: z.number().default(1), +}); + +export class V2QueueConsumer { + private _enabled = false; + private _pollInterval: number; + private _retryOptions: RetryOptions = { + maxAttempts: 3, + factor: 2, + minTimeoutInMs: 1000, + maxTimeoutInMs: 60000, + randomize: true, + }; + private _id: string; + + constructor(private _options: V2QueueConsumerOptions = {}) { + this._pollInterval = this._options.pollInterval || 1000; + this._retryOptions = { + ...this._retryOptions, + ...this._options.retryOptions, + }; + this._id = generateFriendlyId("v2-consumer", 6); + } + + async start(startDelay: number = 0) { + if (this._enabled) { + return; + } + + this._enabled = true; + + // Only putting this here once does not actually delay the start of the consumer (for some reason) + await new Promise((resolve) => setTimeout(resolve, startDelay)); + await new Promise((resolve) => setTimeout(resolve, startDelay)); + + logger.debug(`[marqsv2] Starting V2QueueConsumer`, { + startDelay, + }); + + return this.#doWork().catch(console.error); + } + + async stop() { + if (!this._enabled) { + return; + } + + logger.debug("[marqsv2] Stopping V2QueueConsumer"); + + this._enabled = false; + } + + async #doWork() { + if (!this._enabled) { + return; + } + + await this.#doWorkInternal(); + } + + async #doWorkInternal() { + const message = await marqsv2.dequeueMessageInSharedQueue(this._id); + + if (!message) { + setTimeout(() => this.#doWork(), this._pollInterval); + return; + } + + const messageBody = MessageBody.safeParse(message.data); + + if (!messageBody.success) { + logger.error("[marqsv2] Failed to parse message", { + queueMessage: message.data, + error: messageBody.error, + }); + + await marqsv2.acknowledgeMessage(message.messageId); + + setTimeout(() => this.#doWork(), this._pollInterval); + return; + } + + logger.debug("[V2QueueConsumer] Received message", { + messageData: messageBody.data, + }); + + try { + const service = new PerformRunExecutionV3Service(); + + await service.call({ + id: messageBody.data.runId, + reason: "EXECUTE_JOB", + isRetry: false, + lastAttempt: false, + }); + } catch (error) { + logger.error("[marqsv2] Failed to execute job", { + runId: messageBody.data.runId, + error, + }); + + const attempt = messageBody.data.attempt + 1; + + const retryDelay = calculateNextRetryDelay(this._retryOptions, attempt); + + if (!retryDelay) { + logger.error("[marqsv2] Job failed after max attempts", { + runId: messageBody.data.runId, + attempt, + }); + + await marqsv2.acknowledgeMessage(message.messageId); + } else { + await marqsv2.nackMessage(message.messageId, Date.now() + retryDelay, { + attempt, + }); + } + } finally { + setTimeout(() => this.#doWork(), this._pollInterval); + } + } +} + +interface V2QueueConsumerPoolOptions { + poolSize: number; + pollInterval: number; +} + +class V2QueueConsumerPool { + #consumers: V2QueueConsumer[]; + #shuttingDown: boolean = false; + + constructor(private opts: V2QueueConsumerPoolOptions) { + this.#consumers = Array(opts.poolSize) + .fill(null) + .map((_, i) => new V2QueueConsumer({ pollInterval: opts.pollInterval })); + + process.on("SIGTERM", this.#handleSignal.bind(this)); + process.on("SIGINT", this.#handleSignal.bind(this)); + } + + async start() { + await Promise.allSettled( + this.#consumers.map((consumer, i) => + consumer.start(i * (this.opts.pollInterval / this.opts.poolSize)) + ) + ); + } + + async stop() { + await Promise.allSettled(this.#consumers.map((consumer) => consumer.stop())); + } + + async #handleSignal(signal: string) { + if (this.#shuttingDown) { + return; + } + + this.#shuttingDown = true; + + logger.debug(`[V2QueueConsumerPool] Received ${signal}, shutting down...`); + + this.stop().finally(() => { + logger.debug("V2QueueConsumerPool shutdown"); + }); + } +} + +export const v2QueueConsumerPool = singleton("v2QueueConsumerPool", initalizePool); + +async function initalizePool() { + if (env.V2_MARQS_CONSUMER_POOL_ENABLED === "0") { + return; + } + + console.log( + `🎱 Initializing V2QueueConsumerPool (poolSize=${env.V2_MARQS_CONSUMER_POOL_SIZE}, pollInterval=${env.V2_MARQS_CONSUMER_POLL_INTERVAL_MS})` + ); + + const pool = new V2QueueConsumerPool({ + poolSize: env.V2_MARQS_CONSUMER_POOL_SIZE, + pollInterval: env.V2_MARQS_CONSUMER_POLL_INTERVAL_MS, + }); + + await pool.start(); + + return pool; +} + +export async function putConcurrencyLimitGroup( + concurrencyLimitGroup: ConcurrencyLimitGroup, + env: AuthenticatedEnvironment +): Promise { + logger.debug(`[marqsv2] Updating concurrency limit group`, { + concurrencyLimitGroup, + environment: env, + }); + + await marqsv2.updateQueueConcurrencyLimits( + env, + `group/${concurrencyLimitGroup.name}`, + concurrencyLimitGroup.concurrencyLimit + ); +} + +export async function putJobConcurrencyLimit( + job: Job, + env: AuthenticatedEnvironment, + concurrency: number +): Promise { + logger.debug(`[marqsv2] Updating job concurrency limit`, { + job, + concurrency, + environment: env, + }); + + if (concurrency === 0) { + await marqsv2.removeQueueConcurrencyLimits(env, `job/${job.slug}`); + } else { + await marqsv2.updateQueueConcurrencyLimits(env, `job/${job.slug}`, concurrency); + } +} diff --git a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts new file mode 100644 index 00000000000..88494d01598 --- /dev/null +++ b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts @@ -0,0 +1,12 @@ +import { RequeueTaskRunService } from "../requeueTaskRun.server"; +import { VisibilityTimeoutStrategy } from "./types"; + +export class V3VisibilityTimeout implements VisibilityTimeoutStrategy { + async heartbeat(messageId: string, timeoutInMs: number): Promise { + await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + } + + async cancelHeartbeat(messageId: string): Promise { + await RequeueTaskRunService.dequeue(messageId); + } +} diff --git a/apps/webapp/test/marqs.test.ts b/apps/webapp/test/marqs.test.ts deleted file mode 100644 index 5458336cc21..00000000000 --- a/apps/webapp/test/marqs.test.ts +++ /dev/null @@ -1,316 +0,0 @@ -import { SimpleWeightedChoiceStrategy } from "../app/v3/marqs/priorityStrategy.server"; - -describe("SimpleWeightedChoiceStrategy", () => { - it("should use a weighted random choice algorithm to choose a queue", async () => { - const stategy = new SimpleWeightedChoiceStrategy({ - queueSelectionCount: 3, - randomSeed: "test", - }); - - const chosenQueue = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue3", - age: 12828, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue).toEqual("queue3"); - }); - - it("should filter out queues if any capacity is full", async () => { - const stategy = new SimpleWeightedChoiceStrategy({ - queueSelectionCount: 3, - randomSeed: "test", - }); - - const chosenQueue = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 10, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 10, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue3", - age: 12828, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 10, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue).toEqual({ abort: true }); - - const nextSelection = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection).toEqual({ - range: { offset: 3, count: 3 }, - selectionId: expect.any(String), - }); - - // Now pass some queues that have some capacity - const chosenQueue2 = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue3", - age: 12828, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue2).toEqual("queue3"); - - const nextSelection2 = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection2).toEqual({ - range: { offset: 6, count: 3 }, - selectionId: expect.any(String), - }); - }); - - it("should adjust the next filter range only if passed the maximum number of queues", async () => { - const stategy = new SimpleWeightedChoiceStrategy({ - queueSelectionCount: 3, - randomSeed: "test", - }); - - const chosenQueue = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 10, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 10, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue).toEqual({ abort: true }); - - const nextSelection = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection).toEqual({ - range: { offset: 0, count: 3 }, - selectionId: expect.any(String), - }); - }); - - it("should adjust the next candidate range ONLY for the matching parent queue", async () => { - const stategy = new SimpleWeightedChoiceStrategy({ - queueSelectionCount: 3, - randomSeed: "test", - }); - - const chosenQueue = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 10, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue3", - age: 12828, - capacities: { - queue: { current: 10, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue).toEqual("queue1"); - - const nextSelection = await stategy.nextCandidateSelection("parentQueue2"); - - expect(nextSelection).toEqual({ - range: { offset: 0, count: 3 }, - selectionId: expect.any(String), - }); - - const nextSelection2 = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection2).toEqual({ - range: { offset: 3, count: 3 }, - selectionId: expect.any(String), - }); - - const chosenQueue2 = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue3", - age: 12828, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue2).toEqual("queue2"); - - const nextSelection3 = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection3).toEqual({ - range: { offset: 6, count: 3 }, - selectionId: expect.any(String), - }); - - // Not passed 3 queues, so the range should be reset (we've reached the end) - const chosenQueue3 = stategy.chooseQueue( - [ - { - queue: "queue1", - age: 4497, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - { - queue: "queue2", - age: 19670, - capacities: { - queue: { current: 0, limit: 10 }, - env: { current: 0, limit: 10 }, - org: { current: 0, limit: 10 }, - }, - }, - ], - "parentQueue", - "selectionId" - ); - - expect(chosenQueue3).toEqual("queue2"); - - const nextSelection4 = await stategy.nextCandidateSelection("parentQueue"); - - expect(nextSelection4).toEqual({ - range: { offset: 0, count: 3 }, - selectionId: expect.any(String), - }); - }); -}); diff --git a/perf/src/index.ts b/perf/src/index.ts index 4083f9d9493..97785b8139f 100644 --- a/perf/src/index.ts +++ b/perf/src/index.ts @@ -1,5 +1,32 @@ +import { TriggerClient } from "@trigger.dev/sdk"; import { triggerClient } from "./trigger"; +const clients: TriggerClient[] = []; + +const possibleKeys = [ + "TRIGGER_API_KEY", + "TRIGGER_API_KEY_2", + "TRIGGER_API_KEY_3", + "TRIGGER_API_KEY_4", + "TRIGGER_API_KEY_5", + "TRIGGER_API_KEY_6", + "TRIGGER_API_KEY_7", + "TRIGGER_API_KEY_8", + "TRIGGER_API_KEY_9", +]; + +for (let i = 0; i < possibleKeys.length; i++) { + if (process.env[possibleKeys[i]]) { + clients.push( + new TriggerClient({ + id: "perf", + apiKey: process.env[possibleKeys[i]], + apiUrl: process.env.TRIGGER_API_URL, + }) + ); + } +} + async function sendEvent() { try { return await triggerClient.sendEvent({ @@ -47,7 +74,7 @@ async function sendEvents(count: number) { }, }); try { - return await triggerClient.sendEvents(events); + return await Promise.all(clients.reverse().map((client) => client.sendEvents(events))); } catch (err) { console.error(err); } diff --git a/perf/src/trigger.ts b/perf/src/trigger.ts index 672ee4d8058..c11d90b558d 100644 --- a/perf/src/trigger.ts +++ b/perf/src/trigger.ts @@ -59,6 +59,7 @@ triggerClient.defineJob({ trigger: eventTrigger({ name: "perf.test", }), + concurrencyLimit: 3, run: async (payload, io, ctx) => { await io.runTask( "task-1", @@ -98,6 +99,11 @@ triggerClient.defineJob({ }, }); +const concurrencyLimit = triggerClient.defineConcurrencyLimit({ + id: `test-shared`, + limit: 5, // Limit all jobs in this group to 5 concurrent executions +}); + triggerClient.defineJob({ id: `perf-test-3`, name: `Perf Test 3`, @@ -105,6 +111,7 @@ triggerClient.defineJob({ trigger: eventTrigger({ name: "perf.test", }), + concurrencyLimit, run: async (payload, io, ctx) => { await io.runTask( "task-1", @@ -143,3 +150,48 @@ triggerClient.defineJob({ ); }, }); + +triggerClient.defineJob({ + id: `perf-test-4`, + name: `Perf Test 4`, + version: "1.0.0", + trigger: eventTrigger({ + name: "perf.test", + }), + concurrencyLimit, + run: async (payload, io, ctx) => { + await io.runTask( + "task-1", + async (task) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + + return { + value: Math.random(), + }; + }, + { name: "task 1" } + ); + + await io.runTask( + "task-2", + async (task) => { + return { + value: Math.random(), + }; + }, + { name: "task 2" } + ); + + await io.runTask( + "task-3", + async (task) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + + return { + value: Math.random(), + }; + }, + { name: "task 3" } + ); + }, +}); diff --git a/references/job-catalog/src/stressTest.ts b/references/job-catalog/src/stressTest.ts index bec5ef1ebf1..fc042f5c463 100644 --- a/references/job-catalog/src/stressTest.ts +++ b/references/job-catalog/src/stressTest.ts @@ -48,11 +48,14 @@ client.defineJob({ await io.runTask( `task-1`, async (task) => { + await new Promise((resolve) => setTimeout(resolve, 10_000)); return { success: true }; }, { name: `Task 1` } ); + await io.wait("wait-1", 1); + await io.runTask( `task-2`, async (task) => { From 96b0dfcdede3664120c00b9a9bef0ef1ddf26184 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 10:58:42 +0100 Subject: [PATCH 2/8] Allow marqsv2 and v2 graphile to run in parallel --- apps/webapp/app/env.server.ts | 1 + apps/webapp/app/platform/zodWorker.server.ts | 14 +---- .../app/services/jobs/registerJob.server.ts | 14 ++++- .../runs/performRunExecutionV3.server.ts | 58 +++++++++++++------ .../app/v3/marqs/requeueV2Message.server.ts | 2 +- apps/webapp/app/v3/marqs/v2.server.ts | 34 ++++++----- .../migration.sql | 2 + packages/database/prisma/schema.prisma | 1 + {apps/webapp => scripts}/analyze_marqs.mjs | 0 9 files changed, 78 insertions(+), 48 deletions(-) create mode 100644 packages/database/prisma/migrations/20240606090155_add_v2_marqs_enabled_flag_on_org/migration.sql rename {apps/webapp => scripts}/analyze_marqs.mjs (100%) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 5f7697bbae9..dafbc8ff75c 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -174,6 +174,7 @@ const EnvironmentSchema = z.object({ MARQS_DISABLE_REBALANCING: z.coerce.boolean().default(false), VERBOSE_GRAPHILE_LOGGING: z.string().default("false"), + V2_MARQS_ENABLED: z.string().default("0"), V2_MARQS_CONSUMER_POOL_ENABLED: z.string().default("0"), V2_MARQS_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), V2_MARQS_CONSUMER_POLL_INTERVAL_MS: z.coerce.number().int().default(1000), diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index 96e85007a83..0a3c75b9459 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -10,12 +10,7 @@ import type { TaskSpec, WorkerUtils, } from "graphile-worker"; -import { - run as graphileRun, - makeWorkerUtils, - parseCronItems, - Logger as GraphileLogger, -} from "graphile-worker"; +import { run as graphileRun, makeWorkerUtils, parseCronItems } from "graphile-worker"; import { SpanKind, trace } from "@opentelemetry/api"; import omit from "lodash.omit"; @@ -28,12 +23,6 @@ import { env } from "~/env.server"; const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1"); -const graphileLogger = new GraphileLogger((scope) => { - return (level, message, meta) => { - logger.debug(`[graphile-worker][${level}] ${message}`, { scope, meta }); - }; -}); - export interface MessageCatalogSchema { [key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; } @@ -192,7 +181,6 @@ export class ZodWorker { taskList: this.#createTaskListFromTasks(), parsedCronItems, forbiddenFlags: this.#rateLimiter?.forbiddenFlags.bind(this.#rateLimiter), - logger: graphileLogger, }); if (!this.#runner) { diff --git a/apps/webapp/app/services/jobs/registerJob.server.ts b/apps/webapp/app/services/jobs/registerJob.server.ts index 25798583f08..72cc1d5978c 100644 --- a/apps/webapp/app/services/jobs/registerJob.server.ts +++ b/apps/webapp/app/services/jobs/registerJob.server.ts @@ -14,6 +14,7 @@ import { putConcurrencyLimitGroup, putJobConcurrencyLimit } from "~/v3/marqs/v2. import type { AuthenticatedEnvironment } from "../apiAuth.server"; import { logger } from "../logger.server"; import { RegisterScheduleSourceService } from "../schedules/registerScheduleSource.server"; +import { executionRateLimiter } from "../runExecutionRateLimiter.server"; export class RegisterJobService { #prismaClient: PrismaClient; @@ -174,11 +175,18 @@ export class RegisterJobService { try { if (jobVersion.concurrencyLimitGroup) { - // Upsert the maxSize for the concurrency limit group + // Upsert the maxSize for the concurrency limit group (marqs v2) await putConcurrencyLimitGroup(jobVersion.concurrencyLimitGroup, environment); - } else if (typeof jobVersion.concurrencyLimit === "number") { - await putJobConcurrencyLimit(job, environment, jobVersion.concurrencyLimit); + + // Upsert the maxSize for the concurrency limit group (legacy) + await executionRateLimiter?.putConcurrencyLimitGroup( + jobVersion.concurrencyLimitGroup, + environment + ); } + + await putJobConcurrencyLimit(job, jobVersion, environment); + await executionRateLimiter?.putJobVersionConcurrencyLimit(jobVersion, environment); } catch (error) { logger.error("Error setting concurrency limit", { error, diff --git a/apps/webapp/app/services/runs/performRunExecutionV3.server.ts b/apps/webapp/app/services/runs/performRunExecutionV3.server.ts index d5bdd9fa3cc..6aaff74b75a 100644 --- a/apps/webapp/app/services/runs/performRunExecutionV3.server.ts +++ b/apps/webapp/app/services/runs/performRunExecutionV3.server.ts @@ -26,6 +26,7 @@ import { RUN_CHUNK_EXECUTION_BUFFER, } from "~/consts"; import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server"; +import { env } from "~/env.server"; import { detectResponseIsTimeout } from "~/models/endpoint.server"; import { isRunCompleted } from "~/models/jobRun.server"; import { resolveRunConnections } from "~/models/runConnection.server"; @@ -38,6 +39,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server"; import { EndpointApi } from "../endpointApi.server"; import { createExecutionEvent } from "../executions/createExecutionEvent.server"; import { logger } from "../logger.server"; +import { executionRateLimiter } from "../runExecutionRateLimiter.server"; import { ResumeTaskService } from "../tasks/resumeTask.server"; import { executionWorker, workerQueue } from "../worker.server"; import { forceYieldCoordinator } from "./forceYieldCoordinator.server"; @@ -103,31 +105,49 @@ export class PerformRunExecutionV3Service { skipRetrying?: boolean; } = {} ) { - let queue = `job/${run.job.slug}`; + if (marqsv2 && run.version.environment.organization.v2MarqsEnabled) { + let queue = `job/${run.job.slug}`; - if (run.version.concurrencyLimitGroup) { - queue = `group/${run.version.concurrencyLimitGroup.name}`; - } + if (run.version.concurrencyLimitGroup) { + queue = `group/${run.version.concurrencyLimitGroup.name}`; + } - const runAt = - priority === "initial" ? options.runAt ?? new Date() : run.startedAt ?? run.createdAt; - - await marqsv2.enqueueMessage( - run.version.environment, - queue, - run.id, - { runId: run.id, attempt: 1 }, - undefined, - runAt.getTime() - ); + const runAt = + priority === "initial" ? options.runAt ?? new Date() : run.startedAt ?? run.createdAt; + + await marqsv2.enqueueMessage( + run.version.environment, + queue, + run.id, + { runId: run.id, attempt: 1 }, + undefined, + runAt.getTime() + ); + } else { + return await executionWorker.enqueue( + "performRunExecutionV3", + { + id: run.id, + reason: "EXECUTE_JOB", + }, + { + tx, + runAt: options.runAt, + jobKey: `job_run:EXECUTE_JOB:${run.id}`, + maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined, + flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [], + priority: priority === "initial" ? 0 : -1, + } + ); + } } static async dequeue(run: JobRun, tx: PrismaClientOrTransaction) { - await marqsv2.acknowledgeMessage(run.id); - await executionWorker.dequeue(`job_run:EXECUTE_JOB:${run.id}`, { tx, }); + + await marqsv2?.acknowledgeMessage(run.id); } async #executeJob(run: FoundRun, input: PerformRunExecutionV3Input, driftInMs: number = 0) { @@ -254,7 +274,9 @@ export class PerformRunExecutionV3Service { forceYieldCoordinator.deregisterRun(run.id); - await marqsv2.acknowledgeMessage(run.id); + if (marqsv2 && run.organization.v2MarqsEnabled) { + await marqsv2.acknowledgeMessage(run.id); + } //if the run has been canceled while it's being executed, we shouldn't do anything more const updatedRun = await this.#prismaClient.jobRun.findUnique({ diff --git a/apps/webapp/app/v3/marqs/requeueV2Message.server.ts b/apps/webapp/app/v3/marqs/requeueV2Message.server.ts index 3ab0cde52dc..cb6b4df1214 100644 --- a/apps/webapp/app/v3/marqs/requeueV2Message.server.ts +++ b/apps/webapp/app/v3/marqs/requeueV2Message.server.ts @@ -10,7 +10,7 @@ export class RequeueV2Message extends BaseService { public async call(runId: string) { logger.debug("[RequeueV2Message] Requeueing task run", { runId }); - marqsv2.nackMessage(runId); + marqsv2?.nackMessage(runId); } public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) { diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts index 9a9d11e8405..ca39b1d2eca 100644 --- a/apps/webapp/app/v3/marqs/v2.server.ts +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -1,6 +1,6 @@ import { trace } from "@opentelemetry/api"; import { RetryOptions, calculateNextRetryDelay } from "@trigger.dev/core/v3"; -import { ConcurrencyLimitGroup, Job } from "@trigger.dev/database"; +import { ConcurrencyLimitGroup, Job, JobVersion } from "@trigger.dev/database"; import { z } from "zod"; import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -47,6 +47,10 @@ export class MarQSV2KeyProducer extends MarQSShortKeyProducer { export const marqsv2 = singleton("marqsv2", getMarQSClient); function getMarQSClient() { + if (env.V2_MARQS_ENABLED === "0") { + return; + } + if (!env.REDIS_HOST || !env.REDIS_PORT) { throw new Error( "Could not initialize marqsv2 because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set. Trigger.dev v2 will not work without this." @@ -73,7 +77,7 @@ function getMarQSClient() { queueSelectionCount: env.V2_MARQS_QUEUE_SELECTION_COUNT, }), envQueuePriorityStrategy: new NoopWeightedChoiceStrategy(), // We don't use this in v2, since all queues go through the shared queue - workers: 1, + workers: 0, redis: redisOptions, defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, // this is so we aren't limited by the environment concurrency defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, @@ -151,7 +155,7 @@ export class V2QueueConsumer { } async #doWorkInternal() { - const message = await marqsv2.dequeueMessageInSharedQueue(this._id); + const message = await marqsv2?.dequeueMessageInSharedQueue(this._id); if (!message) { setTimeout(() => this.#doWork(), this._pollInterval); @@ -166,7 +170,7 @@ export class V2QueueConsumer { error: messageBody.error, }); - await marqsv2.acknowledgeMessage(message.messageId); + await marqsv2?.acknowledgeMessage(message.messageId); setTimeout(() => this.#doWork(), this._pollInterval); return; @@ -201,9 +205,9 @@ export class V2QueueConsumer { attempt, }); - await marqsv2.acknowledgeMessage(message.messageId); + await marqsv2?.acknowledgeMessage(message.messageId); } else { - await marqsv2.nackMessage(message.messageId, Date.now() + retryDelay, { + await marqsv2?.nackMessage(message.messageId, Date.now() + retryDelay, { attempt, }); } @@ -261,6 +265,10 @@ class V2QueueConsumerPool { export const v2QueueConsumerPool = singleton("v2QueueConsumerPool", initalizePool); async function initalizePool() { + if (env.V2_MARQS_ENABLED === "0") { + return; + } + if (env.V2_MARQS_CONSUMER_POOL_ENABLED === "0") { return; } @@ -288,7 +296,7 @@ export async function putConcurrencyLimitGroup( environment: env, }); - await marqsv2.updateQueueConcurrencyLimits( + await marqsv2?.updateQueueConcurrencyLimits( env, `group/${concurrencyLimitGroup.name}`, concurrencyLimitGroup.concurrencyLimit @@ -297,18 +305,18 @@ export async function putConcurrencyLimitGroup( export async function putJobConcurrencyLimit( job: Job, - env: AuthenticatedEnvironment, - concurrency: number + version: JobVersion, + env: AuthenticatedEnvironment ): Promise { logger.debug(`[marqsv2] Updating job concurrency limit`, { job, - concurrency, + version, environment: env, }); - if (concurrency === 0) { - await marqsv2.removeQueueConcurrencyLimits(env, `job/${job.slug}`); + if (typeof version.concurrencyLimit === "number" && version.concurrencyLimit > 0) { + await marqsv2?.updateQueueConcurrencyLimits(env, `job/${job.slug}`, version.concurrencyLimit); } else { - await marqsv2.updateQueueConcurrencyLimits(env, `job/${job.slug}`, concurrency); + await marqsv2?.removeQueueConcurrencyLimits(env, `job/${job.slug}`); } } diff --git a/packages/database/prisma/migrations/20240606090155_add_v2_marqs_enabled_flag_on_org/migration.sql b/packages/database/prisma/migrations/20240606090155_add_v2_marqs_enabled_flag_on_org/migration.sql new file mode 100644 index 00000000000..c441042fdda --- /dev/null +++ b/packages/database/prisma/migrations/20240606090155_add_v2_marqs_enabled_flag_on_org/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Organization" ADD COLUMN "v2MarqsEnabled" BOOLEAN NOT NULL DEFAULT false; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index 2052f5b82c2..fd1b16c2640 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -122,6 +122,7 @@ model Organization { v3Enabled Boolean @default(false) v2Enabled Boolean @default(false) + v2MarqsEnabled Boolean @default(false) hasRequestedV3 Boolean @default(false) environments RuntimeEnvironment[] diff --git a/apps/webapp/analyze_marqs.mjs b/scripts/analyze_marqs.mjs similarity index 100% rename from apps/webapp/analyze_marqs.mjs rename to scripts/analyze_marqs.mjs From bb0a0cc158c558dd82ddf7c9372be3efe971f4ad Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 11:02:32 +0100 Subject: [PATCH 3/8] Fix missing GraphileLogger import --- apps/webapp/app/platform/zodWorker.server.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index 0a3c75b9459..4aaefe77fac 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -10,7 +10,12 @@ import type { TaskSpec, WorkerUtils, } from "graphile-worker"; -import { run as graphileRun, makeWorkerUtils, parseCronItems } from "graphile-worker"; +import { + run as graphileRun, + makeWorkerUtils, + parseCronItems, + Logger as GraphileLogger, +} from "graphile-worker"; import { SpanKind, trace } from "@opentelemetry/api"; import omit from "lodash.omit"; From c666996f2105de7c3a0a2464e1d3ca610afacd8b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 11:19:16 +0100 Subject: [PATCH 4/8] Fixed heartbeat after rebase --- apps/webapp/app/v3/marqs/index.server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 017b6503667..b2203d7c072 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -394,9 +394,9 @@ export class MarQS { }); } - await RequeueTaskRunService.enqueue( + await this.options.visibilityTimeoutStrategy.heartbeat( messageData.messageId, - new Date(Date.now() + this.visibilityTimeoutInMs) + this.visibilityTimeoutInMs ); return message; From ec31523cb8549f44c9e057c7b467b6fc3e02c0d9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 12:01:41 +0100 Subject: [PATCH 5/8] Replace postgres based run counters with redis ones with a backfill --- .../services/autoIncrementCounter.server.ts | 95 +++++++ .../app/services/runs/startRun.server.ts | 40 ++- .../app/v3/services/triggerTask.server.ts | 256 +++++++++--------- apps/webapp/test/placeholder.test.ts | 5 + references/v3-catalog/src/trigger/simple.ts | 2 +- 5 files changed, 246 insertions(+), 152 deletions(-) create mode 100644 apps/webapp/app/services/autoIncrementCounter.server.ts create mode 100644 apps/webapp/test/placeholder.test.ts diff --git a/apps/webapp/app/services/autoIncrementCounter.server.ts b/apps/webapp/app/services/autoIncrementCounter.server.ts new file mode 100644 index 00000000000..b5484af4e3a --- /dev/null +++ b/apps/webapp/app/services/autoIncrementCounter.server.ts @@ -0,0 +1,95 @@ +import Redis, { RedisOptions } from "ioredis"; +import { + $transaction, + Prisma, + PrismaClientOrTransaction, + PrismaTransactionOptions, + prisma, +} from "~/db.server"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; + +export type AutoIncrementCounterOptions = { + redis: RedisOptions; +}; + +export class AutoIncrementCounter { + private _redis: Redis; + + constructor(private options: AutoIncrementCounterOptions) { + this._redis = new Redis(options.redis); + } + + async incrementInTransaction( + key: string, + callback: (num: number, tx: PrismaClientOrTransaction) => Promise, + backfiller?: (key: string, db: PrismaClientOrTransaction) => Promise, + client: PrismaClientOrTransaction = prisma, + transactionOptions?: PrismaTransactionOptions + ): Promise { + let performedIncrement = false; + let performedBackfill = false; + + try { + return await $transaction( + client, + async (tx) => { + let newNumber = await this.#increment(key); + + performedIncrement = true; + + if (newNumber === 1 && backfiller) { + const backfilledNumber = await backfiller(key, tx); + + if (backfilledNumber && backfilledNumber > 1) { + newNumber = backfilledNumber + 1; + await this._redis.set(key, newNumber); + performedBackfill = true; + } + } + + return await callback(newNumber, tx); + }, + transactionOptions + ); + } catch (e) { + if ( + e instanceof Prisma.PrismaClientKnownRequestError || + e instanceof Prisma.PrismaClientUnknownRequestError || + e instanceof Prisma.PrismaClientValidationError + ) { + if (performedIncrement && !performedBackfill) { + await this._redis.decr(key); + } + } + + throw e; + } + } + + async #increment(key: string): Promise { + return await this._redis.incr(key); + } +} + +export const autoIncrementCounter = singleton("auto-increment-counter", getAutoIncrementCounter); + +function getAutoIncrementCounter() { + if (!env.REDIS_HOST || !env.REDIS_PORT) { + throw new Error( + "Could not initialize auto-increment counter because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set. " + ); + } + + return new AutoIncrementCounter({ + redis: { + keyPrefix: "auto-counter:", + port: env.REDIS_PORT, + host: env.REDIS_HOST, + username: env.REDIS_USERNAME, + password: env.REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + }); +} diff --git a/apps/webapp/app/services/runs/startRun.server.ts b/apps/webapp/app/services/runs/startRun.server.ts index 6dfb0a5ba6c..7b0e4433e49 100644 --- a/apps/webapp/app/services/runs/startRun.server.ts +++ b/apps/webapp/app/services/runs/startRun.server.ts @@ -4,11 +4,11 @@ import { type IntegrationConnection, } from "@trigger.dev/database"; import type { PrismaClient, PrismaClientOrTransaction } from "~/db.server"; -import { $transaction, prisma } from "~/db.server"; +import { prisma } from "~/db.server"; +import { autoIncrementCounter } from "../autoIncrementCounter.server"; +import { logger } from "../logger.server"; import { workerQueue } from "../worker.server"; import { ResumeRunService } from "./resumeRun.server"; -import { createHash } from "node:crypto"; -import { logger } from "../logger.server"; type FoundRun = NonNullable>>; type RunConnectionsByKey = Awaited>; @@ -67,22 +67,14 @@ export class StartRunService { : undefined ) .filter(Boolean); - const lockId = jobIdToLockId(run.jobId); - - await $transaction( - this.#prismaClient, - async (tx) => { - const counter = await tx.jobCounter.upsert({ - where: { jobId: run.jobId }, - update: { lastNumber: { increment: 1 } }, - create: { jobId: run.jobId, lastNumber: 1 }, - select: { lastNumber: true }, - }); - const updatedRun = await this.#prismaClient.jobRun.update({ + await autoIncrementCounter.incrementInTransaction( + `v2-run:${run.jobId}`, + async (num, tx) => { + const updatedRun = await tx.jobRun.update({ where: { id }, data: { - number: counter.lastNumber, + number: num, status: "QUEUED", queuedAt: new Date(), runConnections: { @@ -93,7 +85,16 @@ export class StartRunService { await ResumeRunService.enqueue(updatedRun, tx); }, - { timeout: 60000 } + async (_, tx) => { + const counter = await tx.jobCounter.findUnique({ + where: { jobId: run.jobId }, + select: { lastNumber: true }, + }); + + return counter?.lastNumber; + }, + this.#prismaClient, + { timeout: 10_000 } ); } @@ -242,8 +243,3 @@ async function createRunConnections(tx: PrismaClientOrTransaction, run: FoundRun function hasMissingConnections(runConnectionsByKey: RunConnectionsByKey) { return Object.values(runConnectionsByKey).some((connection) => connection.result === "missing"); } - -function jobIdToLockId(jobId: string): number { - // Convert jobId to a unique lock identifier - return parseInt(createHash("sha256").update(jobId).digest("hex").slice(0, 8), 16); -} diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 9c5c0dba6da..ff8c6be57af 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -4,9 +4,9 @@ import { TriggerTaskRequestBody, packetRequiresOffloading, } from "@trigger.dev/core/v3"; -import { createHash } from "node:crypto"; -import { $transaction, prisma } from "~/db.server"; +import { prisma } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; import { eventRepository } from "../eventRepository.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; @@ -84,148 +84,151 @@ export class TriggerTaskService extends BaseService { environment ); - const lockId = taskIdentifierToLockId(taskId); - - const run = await $transaction(this._prisma, async (tx) => { - const lockedToBackgroundWorker = body.options?.lockToVersion - ? await tx.backgroundWorker.findUnique({ - where: { - projectId_runtimeEnvironmentId_version: { - projectId: environment.projectId, - runtimeEnvironmentId: environment.id, - version: body.options?.lockToVersion, + const run = await autoIncrementCounter.incrementInTransaction( + `v3-run:${environment.id}:${taskId}`, + async (num, tx) => { + const lockedToBackgroundWorker = body.options?.lockToVersion + ? await tx.backgroundWorker.findUnique({ + where: { + projectId_runtimeEnvironmentId_version: { + projectId: environment.projectId, + runtimeEnvironmentId: environment.id, + version: body.options?.lockToVersion, + }, }, - }, - }) - : undefined; + }) + : undefined; + + let queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`); + + // Check that the queuename is not an empty string + if (!queueName) { + queueName = sanitizeQueueName(`task/${taskId}`); + } + + event.setAttribute("queueName", queueName); + span.setAttribute("queueName", queueName); - const counter = await tx.taskRunNumberCounter.upsert({ - where: { - taskIdentifier_environmentId: { + const taskRun = await tx.taskRun.create({ + data: { + status: "PENDING", + number: num, + friendlyId: runFriendlyId, + runtimeEnvironmentId: environment.id, + projectId: environment.projectId, + idempotencyKey, taskIdentifier: taskId, - environmentId: environment.id, + payload: payloadPacket.data ?? "", + payloadType: payloadPacket.dataType, + context: body.context, + traceContext: traceContext, + traceId: event.traceId, + spanId: event.spanId, + lockedToVersionId: lockedToBackgroundWorker?.id, + concurrencyKey: body.options?.concurrencyKey, + queue: queueName, + isTest: body.options?.test ?? false, }, - }, - update: { lastNumber: { increment: 1 } }, - create: { taskIdentifier: taskId, environmentId: environment.id, lastNumber: 1 }, - select: { lastNumber: true }, - }); - - let queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`); - - // Check that the queuename is not an empty string - if (!queueName) { - queueName = sanitizeQueueName(`task/${taskId}`); - } - - event.setAttribute("queueName", queueName); - span.setAttribute("queueName", queueName); - - const taskRun = await tx.taskRun.create({ - data: { - status: "PENDING", - number: counter.lastNumber, - friendlyId: runFriendlyId, - runtimeEnvironmentId: environment.id, - projectId: environment.projectId, - idempotencyKey, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: traceContext, - traceId: event.traceId, - spanId: event.spanId, - lockedToVersionId: lockedToBackgroundWorker?.id, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - isTest: body.options?.test ?? false, - }, - }); - - if (payloadPacket.data) { - if ( - payloadPacket.dataType === "application/json" || - payloadPacket.dataType === "application/super+json" - ) { - event.setAttribute("payload", JSON.parse(payloadPacket.data) as any); - } else { - event.setAttribute("payload", payloadPacket.data); - } + }); - event.setAttribute("payloadType", payloadPacket.dataType); - } + if (payloadPacket.data) { + if ( + payloadPacket.dataType === "application/json" || + payloadPacket.dataType === "application/super+json" + ) { + event.setAttribute("payload", JSON.parse(payloadPacket.data) as any); + } else { + event.setAttribute("payload", payloadPacket.data); + } + + event.setAttribute("payloadType", payloadPacket.dataType); + } - event.setAttribute("runId", taskRun.friendlyId); - span.setAttribute("runId", taskRun.friendlyId); + event.setAttribute("runId", taskRun.friendlyId); + span.setAttribute("runId", taskRun.friendlyId); - if (body.options?.dependentAttempt) { - const dependentAttempt = await tx.taskRunAttempt.findUnique({ - where: { friendlyId: body.options.dependentAttempt }, - }); + if (body.options?.dependentAttempt) { + const dependentAttempt = await tx.taskRunAttempt.findUnique({ + where: { friendlyId: body.options.dependentAttempt }, + }); - if (dependentAttempt) { - await tx.taskRunDependency.create({ - data: { - taskRunId: taskRun.id, - dependentAttemptId: dependentAttempt.id, - }, + if (dependentAttempt) { + await tx.taskRunDependency.create({ + data: { + taskRunId: taskRun.id, + dependentAttemptId: dependentAttempt.id, + }, + }); + } + } else if (body.options?.dependentBatch) { + const dependentBatchRun = await tx.batchTaskRun.findUnique({ + where: { friendlyId: body.options.dependentBatch }, }); + + if (dependentBatchRun) { + await tx.taskRunDependency.create({ + data: { + taskRunId: taskRun.id, + dependentBatchRunId: dependentBatchRun.id, + }, + }); + } } - } else if (body.options?.dependentBatch) { - const dependentBatchRun = await tx.batchTaskRun.findUnique({ - where: { friendlyId: body.options.dependentBatch }, - }); - if (dependentBatchRun) { - await tx.taskRunDependency.create({ - data: { - taskRunId: taskRun.id, - dependentBatchRunId: dependentBatchRun.id, + if (body.options?.queue) { + const concurrencyLimit = body.options.queue.concurrencyLimit + ? Math.max(0, body.options.queue.concurrencyLimit) + : null; + const taskQueue = await prisma.taskQueue.upsert({ + where: { + runtimeEnvironmentId_name: { + runtimeEnvironmentId: environment.id, + name: queueName, + }, + }, + update: { + concurrencyLimit, + rateLimit: body.options.queue.rateLimit, + }, + create: { + friendlyId: generateFriendlyId("queue"), + name: queueName, + concurrencyLimit, + runtimeEnvironmentId: environment.id, + projectId: environment.projectId, + rateLimit: body.options.queue.rateLimit, + type: "NAMED", }, }); + + if (typeof taskQueue.concurrencyLimit === "number") { + await marqs?.updateQueueConcurrencyLimits( + environment, + taskQueue.name, + taskQueue.concurrencyLimit + ); + } else { + await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); + } } - } - if (body.options?.queue) { - const concurrencyLimit = body.options.queue.concurrencyLimit - ? Math.max(0, body.options.queue.concurrencyLimit) - : null; - const taskQueue = await prisma.taskQueue.upsert({ + return taskRun; + }, + async (_, tx) => { + const counter = await tx.taskRunNumberCounter.findUnique({ where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: environment.id, - name: queueName, + taskIdentifier_environmentId: { + taskIdentifier: taskId, + environmentId: environment.id, }, }, - update: { - concurrencyLimit, - rateLimit: body.options.queue.rateLimit, - }, - create: { - friendlyId: generateFriendlyId("queue"), - name: queueName, - concurrencyLimit, - runtimeEnvironmentId: environment.id, - projectId: environment.projectId, - rateLimit: body.options.queue.rateLimit, - type: "NAMED", - }, + select: { lastNumber: true }, }); - if (typeof taskQueue.concurrencyLimit === "number") { - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } else { - await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); - } - } - - return taskRun; - }); + return counter?.lastNumber; + }, + this._prisma + ); if (!run) { return; @@ -286,8 +289,3 @@ export class TriggerTaskService extends BaseService { return { dataType: payloadType }; } } - -function taskIdentifierToLockId(taskIdentifier: string): number { - // Convert taskIdentifier to a unique lock identifier - return parseInt(createHash("sha256").update(taskIdentifier).digest("hex").slice(0, 8), 16); -} diff --git a/apps/webapp/test/placeholder.test.ts b/apps/webapp/test/placeholder.test.ts new file mode 100644 index 00000000000..361cb08e466 --- /dev/null +++ b/apps/webapp/test/placeholder.test.ts @@ -0,0 +1,5 @@ +describe("Placeholder", () => { + it("should pass", () => { + expect(true).toBe(true); + }); +}); diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index 0dcab6d5ec4..844c680b1bf 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -10,7 +10,7 @@ export const simplestTask = task({ body: JSON.stringify({ hello: "world", taskId: "fetch-post-task", - foo: "barrrrrrrrrrrrrrrrrrrrrr", + foo: "barrrrrrrrrrrrrrrrrrrrrrr", }), }); From f01a79144b29e0bdca13781f12d3217e137e4578 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 13:10:20 +0100 Subject: [PATCH 6/8] Add back in the graphile logger --- apps/webapp/app/platform/zodWorker.server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index 4aaefe77fac..8df928e528c 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -186,6 +186,7 @@ export class ZodWorker { taskList: this.#createTaskListFromTasks(), parsedCronItems, forbiddenFlags: this.#rateLimiter?.forbiddenFlags.bind(this.#rateLimiter), + logger: graphileLogger, }); if (!this.#runner) { From f3a71271ec364743c7ec9bd85e67531ef583ebef Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 13:38:18 +0100 Subject: [PATCH 7/8] Remove duplicate visibility timeout calls --- apps/webapp/app/v3/marqs/index.server.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index b2203d7c072..3baf3218710 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -290,11 +290,6 @@ export class MarQS { this.visibilityTimeoutInMs ); - await this.options.visibilityTimeoutStrategy.heartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); - return message; }, { @@ -379,11 +374,6 @@ export class MarQS { const message = await this.readMessage(messageData.messageId); - await this.options.visibilityTimeoutStrategy.heartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); - if (message) { span.setAttributes({ [SEMATTRS_MESSAGE_ID]: message.messageId, From 373206da382d9ebba6ac5de94d4ea0bc3d94ee45 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Jun 2024 13:55:04 +0100 Subject: [PATCH 8/8] Clamp simple weighted strategy to max of 5 --- .../app/v3/marqs/simpleWeightedPriorityStrategy.server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts index 402f65f5b2b..c2e42a47954 100644 --- a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts +++ b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts @@ -72,11 +72,11 @@ export class SimpleWeightedChoiceStrategy implements MarQSQueuePriorityStrategy let totalWeight = 1; if (size > avgQueueSize) { - totalWeight += Math.min(size / avgQueueSize, 2); + totalWeight += Math.min(size / avgQueueSize, 4); } if (age > avgMessageAge) { - totalWeight += Math.min(age / avgMessageAge); + totalWeight += Math.min(age / avgMessageAge, 4); } return {