Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ const EnvironmentSchema = z.object({
MARQS_DISABLE_REBALANCING: z.coerce.boolean().default(false),

VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
V2_MARQS_ENABLED: z.string().default("0"),
V2_MARQS_CONSUMER_POOL_ENABLED: z.string().default("0"),
V2_MARQS_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
V2_MARQS_CONSUMER_POLL_INTERVAL_MS: z.coerce.number().int().default(1000),
V2_MARQS_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
V2_MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
.int()
.default(60 * 1000 * 15),
V2_MARQS_DEFAULT_ENV_CONCURRENCY: z.coerce.number().int().default(100),
V2_MARQS_VERBOSE: z.string().default("0"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
8 changes: 5 additions & 3 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,18 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

this.#shuttingDown = true;

this.#logDebug(
`Received ${signal}, shutting down zodWorker with timeout ${this.#shutdownTimeoutInMs}ms`
);

if (this.#shutdownTimeoutInMs) {
setTimeout(() => {
this.#logDebug("Shutdown timeout reached, exiting process");
this.#logDebug(`Shutdown timeout of ${this.#shutdownTimeoutInMs} reached, exiting process`);

process.exit(0);
}, this.#shutdownTimeoutInMs);
}

this.#logDebug(`Received ${signal}, shutting down zodWorker...`);

this.stop().finally(() => {
this.#logDebug("zodWorker stopped");
});
Expand Down
95 changes: 95 additions & 0 deletions apps/webapp/app/services/autoIncrementCounter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import Redis, { RedisOptions } from "ioredis";
import {
$transaction,
Prisma,
PrismaClientOrTransaction,
PrismaTransactionOptions,
prisma,
} from "~/db.server";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";

export type AutoIncrementCounterOptions = {
redis: RedisOptions;
};

export class AutoIncrementCounter {
private _redis: Redis;

constructor(private options: AutoIncrementCounterOptions) {
this._redis = new Redis(options.redis);
}

async incrementInTransaction<T>(
key: string,
callback: (num: number, tx: PrismaClientOrTransaction) => Promise<T>,
backfiller?: (key: string, db: PrismaClientOrTransaction) => Promise<number | undefined>,
client: PrismaClientOrTransaction = prisma,
transactionOptions?: PrismaTransactionOptions
): Promise<T | undefined> {
let performedIncrement = false;
let performedBackfill = false;

try {
return await $transaction(
client,
async (tx) => {
let newNumber = await this.#increment(key);

performedIncrement = true;

if (newNumber === 1 && backfiller) {
const backfilledNumber = await backfiller(key, tx);

if (backfilledNumber && backfilledNumber > 1) {
newNumber = backfilledNumber + 1;
await this._redis.set(key, newNumber);
performedBackfill = true;
}
}

return await callback(newNumber, tx);
},
transactionOptions
);
} catch (e) {
if (
e instanceof Prisma.PrismaClientKnownRequestError ||
e instanceof Prisma.PrismaClientUnknownRequestError ||
e instanceof Prisma.PrismaClientValidationError
) {
if (performedIncrement && !performedBackfill) {
await this._redis.decr(key);
}
}

throw e;
}
}

async #increment(key: string): Promise<number> {
return await this._redis.incr(key);
}
}

export const autoIncrementCounter = singleton("auto-increment-counter", getAutoIncrementCounter);

function getAutoIncrementCounter() {
if (!env.REDIS_HOST || !env.REDIS_PORT) {
throw new Error(
"Could not initialize auto-increment counter because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set. "
);
}

return new AutoIncrementCounter({
redis: {
keyPrefix: "auto-counter:",
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
});
}
8 changes: 6 additions & 2 deletions apps/webapp/app/services/jobs/registerJob.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import {
assertExhaustive,
} from "@trigger.dev/core";
import type { Endpoint, Integration, Job, JobIntegration, JobVersion } from "@trigger.dev/database";
import { DEFAULT_MAX_CONCURRENT_RUNS } from "~/consts";
import type { PrismaClient } from "~/db.server";
import { prisma } from "~/db.server";
import { ExtendedEndpoint, findEndpoint } from "~/models/endpoint.server";
import type { RuntimeEnvironment } from "~/models/runtimeEnvironment.server";
import { putConcurrencyLimitGroup, putJobConcurrencyLimit } from "~/v3/marqs/v2.server";
import type { AuthenticatedEnvironment } from "../apiAuth.server";
import { logger } from "../logger.server";
import { RegisterScheduleSourceService } from "../schedules/registerScheduleSource.server";
Expand Down Expand Up @@ -175,13 +175,17 @@ export class RegisterJobService {

try {
if (jobVersion.concurrencyLimitGroup) {
// Upsert the maxSize for the concurrency limit group
// Upsert the maxSize for the concurrency limit group (marqs v2)
await putConcurrencyLimitGroup(jobVersion.concurrencyLimitGroup, environment);

// Upsert the maxSize for the concurrency limit group (legacy)
await executionRateLimiter?.putConcurrencyLimitGroup(
jobVersion.concurrencyLimitGroup,
environment
);
}

await putJobConcurrencyLimit(job, jobVersion, environment);
await executionRateLimiter?.putJobVersionConcurrencyLimit(jobVersion, environment);
} catch (error) {
logger.error("Error setting concurrency limit", {
Expand Down
70 changes: 47 additions & 23 deletions apps/webapp/app/services/runs/performRunExecutionV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ import {
supportsFeature,
} from "@trigger.dev/core";
import { BloomFilter } from "@trigger.dev/core-backend";
import {
ConcurrencyLimitGroup,
JobRun,
JobVersion,
RuntimeEnvironment,
} from "@trigger.dev/database";
import { ConcurrencyLimitGroup, Job, JobRun, JobVersion } from "@trigger.dev/database";
import { generateErrorMessage } from "zod-error";
import { eventRecordToApiJson } from "~/api.server";
import {
Expand All @@ -31,22 +26,24 @@ import {
RUN_CHUNK_EXECUTION_BUFFER,
} from "~/consts";
import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server";
import { env } from "~/env.server";
import { detectResponseIsTimeout } from "~/models/endpoint.server";
import { isRunCompleted } from "~/models/jobRun.server";
import { resolveRunConnections } from "~/models/runConnection.server";
import { prepareTasksForCaching, prepareTasksForCachingLegacy } from "~/models/task.server";
import { CompleteRunTaskService } from "~/routes/api.v1.runs.$runId.tasks.$id.complete/CompleteRunTaskService.server";
import { formatError } from "~/utils/formatErrors.server";
import { safeJsonZodParse } from "~/utils/json";
import { marqsv2 } from "~/v3/marqs/v2.server";
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { EndpointApi } from "../endpointApi.server";
import { createExecutionEvent } from "../executions/createExecutionEvent.server";
import { logger } from "../logger.server";
import { executionRateLimiter } from "../runExecutionRateLimiter.server";
import { ResumeTaskService } from "../tasks/resumeTask.server";
import { executionWorker, workerQueue } from "../worker.server";
import { forceYieldCoordinator } from "./forceYieldCoordinator.server";
import { ResumeRunService } from "./resumeRun.server";
import { executionRateLimiter } from "../runExecutionRateLimiter.server";
import { env } from "~/env.server";

type FoundRun = NonNullable<Awaited<ReturnType<typeof findRun>>>;
type FoundTask = FoundRun["tasks"][number];
Expand Down Expand Up @@ -96,9 +93,10 @@ export class PerformRunExecutionV3Service {
static async enqueue(
run: JobRun & {
version: JobVersion & {
environment: RuntimeEnvironment;
environment: AuthenticatedEnvironment;
concurrencyLimitGroup?: ConcurrencyLimitGroup | null;
};
job: Job;
},
priority: RunExecutionPriority,
tx: PrismaClientOrTransaction,
Expand All @@ -107,27 +105,49 @@ export class PerformRunExecutionV3Service {
skipRetrying?: boolean;
} = {}
) {
return await executionWorker.enqueue(
"performRunExecutionV3",
{
id: run.id,
reason: "EXECUTE_JOB",
},
{
tx,
runAt: options.runAt,
jobKey: `job_run:EXECUTE_JOB:${run.id}`,
maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined,
flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [],
priority: priority === "initial" ? 0 : -1,
if (marqsv2 && run.version.environment.organization.v2MarqsEnabled) {
let queue = `job/${run.job.slug}`;

if (run.version.concurrencyLimitGroup) {
queue = `group/${run.version.concurrencyLimitGroup.name}`;
}
);

const runAt =
priority === "initial" ? options.runAt ?? new Date() : run.startedAt ?? run.createdAt;

await marqsv2.enqueueMessage(
run.version.environment,
queue,
run.id,
{ runId: run.id, attempt: 1 },
undefined,
runAt.getTime()
);
} else {
return await executionWorker.enqueue(
"performRunExecutionV3",
{
id: run.id,
reason: "EXECUTE_JOB",
},
{
tx,
runAt: options.runAt,
jobKey: `job_run:EXECUTE_JOB:${run.id}`,
maxAttempts: options.skipRetrying ? env.DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined,
flags: executionRateLimiter?.flagsForRun(run, run.version) ?? [],
priority: priority === "initial" ? 0 : -1,
}
);
}
}

static async dequeue(run: JobRun, tx: PrismaClientOrTransaction) {
await executionWorker.dequeue(`job_run:EXECUTE_JOB:${run.id}`, {
tx,
});

await marqsv2?.acknowledgeMessage(run.id);
}

async #executeJob(run: FoundRun, input: PerformRunExecutionV3Input, driftInMs: number = 0) {
Expand Down Expand Up @@ -254,6 +274,10 @@ export class PerformRunExecutionV3Service {

forceYieldCoordinator.deregisterRun(run.id);

if (marqsv2 && run.organization.v2MarqsEnabled) {
await marqsv2.acknowledgeMessage(run.id);
}

//if the run has been canceled while it's being executed, we shouldn't do anything more
const updatedRun = await this.#prismaClient.jobRun.findUnique({
select: {
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/services/runs/resumeRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,15 @@ async function findRun(prisma: PrismaClientOrTransaction, id: string) {
return await prisma.jobRun.findUnique({
where: { id },
include: {
job: true,
version: {
include: {
environment: true,
environment: {
include: {
organization: true,
project: true,
},
},
concurrencyLimitGroup: true,
},
},
Expand Down
40 changes: 18 additions & 22 deletions apps/webapp/app/services/runs/startRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import {
type IntegrationConnection,
} from "@trigger.dev/database";
import type { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { $transaction, prisma } from "~/db.server";
import { prisma } from "~/db.server";
import { autoIncrementCounter } from "../autoIncrementCounter.server";
import { logger } from "../logger.server";
import { workerQueue } from "../worker.server";
import { ResumeRunService } from "./resumeRun.server";
import { createHash } from "node:crypto";
import { logger } from "../logger.server";

type FoundRun = NonNullable<Awaited<ReturnType<typeof findRun>>>;
type RunConnectionsByKey = Awaited<ReturnType<typeof createRunConnections>>;
Expand Down Expand Up @@ -67,22 +67,14 @@ export class StartRunService {
: undefined
)
.filter(Boolean);
const lockId = jobIdToLockId(run.jobId);

await $transaction(
this.#prismaClient,
async (tx) => {
const counter = await tx.jobCounter.upsert({
where: { jobId: run.jobId },
update: { lastNumber: { increment: 1 } },
create: { jobId: run.jobId, lastNumber: 1 },
select: { lastNumber: true },
});

const updatedRun = await this.#prismaClient.jobRun.update({
await autoIncrementCounter.incrementInTransaction(
`v2-run:${run.jobId}`,
async (num, tx) => {
const updatedRun = await tx.jobRun.update({
where: { id },
data: {
number: counter.lastNumber,
number: num,
status: "QUEUED",
queuedAt: new Date(),
runConnections: {
Expand All @@ -93,7 +85,16 @@ export class StartRunService {

await ResumeRunService.enqueue(updatedRun, tx);
},
{ timeout: 60000 }
async (_, tx) => {
const counter = await tx.jobCounter.findUnique({
where: { jobId: run.jobId },
select: { lastNumber: true },
});

return counter?.lastNumber;
},
this.#prismaClient,
{ timeout: 10_000 }
);
}

Expand Down Expand Up @@ -242,8 +243,3 @@ async function createRunConnections(tx: PrismaClientOrTransaction, run: FoundRun
function hasMissingConnections(runConnectionsByKey: RunConnectionsByKey) {
return Object.values(runConnectionsByKey).some((connection) => connection.result === "missing");
}

function jobIdToLockId(jobId: string): number {
// Convert jobId to a unique lock identifier
return parseInt(createHash("sha256").update(jobId).digest("hex").slice(0, 8), 16);
}
3 changes: 3 additions & 0 deletions apps/webapp/app/services/tasks/resumeTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export class ResumeTaskService {
logger.debug("ResumeTaskService.call resuming run execution", {
parent: task.parent,
taskId: task.id,
runId: task.run.id,
org: task.run.organizationId,
environment: task.run.environmentId,
});

if (task.parent && task.parent.childExecutionMode === "PARALLEL") {
Expand Down
Loading