@@ -16,12 +16,7 @@ import {
1616 supportsFeature ,
1717} from "@trigger.dev/core" ;
1818import { BloomFilter } from "@trigger.dev/core-backend" ;
19- import {
20- ConcurrencyLimitGroup ,
21- JobRun ,
22- JobVersion ,
23- RuntimeEnvironment ,
24- } from "@trigger.dev/database" ;
19+ import { ConcurrencyLimitGroup , Job , JobRun , JobVersion } from "@trigger.dev/database" ;
2520import { generateErrorMessage } from "zod-error" ;
2621import { eventRecordToApiJson } from "~/api.server" ;
2722import {
@@ -31,22 +26,24 @@ import {
3126 RUN_CHUNK_EXECUTION_BUFFER ,
3227} from "~/consts" ;
3328import { $transaction , PrismaClient , PrismaClientOrTransaction , prisma } from "~/db.server" ;
29+ import { env } from "~/env.server" ;
3430import { detectResponseIsTimeout } from "~/models/endpoint.server" ;
3531import { isRunCompleted } from "~/models/jobRun.server" ;
3632import { resolveRunConnections } from "~/models/runConnection.server" ;
3733import { prepareTasksForCaching , prepareTasksForCachingLegacy } from "~/models/task.server" ;
3834import { CompleteRunTaskService } from "~/routes/api.v1.runs.$runId.tasks.$id.complete/CompleteRunTaskService.server" ;
3935import { formatError } from "~/utils/formatErrors.server" ;
4036import { safeJsonZodParse } from "~/utils/json" ;
37+ import { marqsv2 } from "~/v3/marqs/v2.server" ;
38+ import { AuthenticatedEnvironment } from "../apiAuth.server" ;
4139import { EndpointApi } from "../endpointApi.server" ;
4240import { createExecutionEvent } from "../executions/createExecutionEvent.server" ;
4341import { logger } from "../logger.server" ;
42+ import { executionRateLimiter } from "../runExecutionRateLimiter.server" ;
4443import { ResumeTaskService } from "../tasks/resumeTask.server" ;
4544import { executionWorker , workerQueue } from "../worker.server" ;
4645import { forceYieldCoordinator } from "./forceYieldCoordinator.server" ;
4746import { ResumeRunService } from "./resumeRun.server" ;
48- import { executionRateLimiter } from "../runExecutionRateLimiter.server" ;
49- import { env } from "~/env.server" ;
5047
5148type FoundRun = NonNullable < Awaited < ReturnType < typeof findRun > > > ;
5249type FoundTask = FoundRun [ "tasks" ] [ number ] ;
@@ -96,9 +93,10 @@ export class PerformRunExecutionV3Service {
9693 static async enqueue (
9794 run : JobRun & {
9895 version : JobVersion & {
99- environment : RuntimeEnvironment ;
96+ environment : AuthenticatedEnvironment ;
10097 concurrencyLimitGroup ?: ConcurrencyLimitGroup | null ;
10198 } ;
99+ job : Job ;
102100 } ,
103101 priority : RunExecutionPriority ,
104102 tx : PrismaClientOrTransaction ,
@@ -107,27 +105,49 @@ export class PerformRunExecutionV3Service {
107105 skipRetrying ?: boolean ;
108106 } = { }
109107 ) {
110- return await executionWorker . enqueue (
111- "performRunExecutionV3" ,
112- {
113- id : run . id ,
114- reason : "EXECUTE_JOB" ,
115- } ,
116- {
117- tx,
118- runAt : options . runAt ,
119- jobKey : `job_run:EXECUTE_JOB:${ run . id } ` ,
120- maxAttempts : options . skipRetrying ? env . DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined ,
121- flags : executionRateLimiter ?. flagsForRun ( run , run . version ) ?? [ ] ,
122- priority : priority === "initial" ? 0 : - 1 ,
108+ if ( marqsv2 && run . version . environment . organization . v2MarqsEnabled ) {
109+ let queue = `job/${ run . job . slug } ` ;
110+
111+ if ( run . version . concurrencyLimitGroup ) {
112+ queue = `group/${ run . version . concurrencyLimitGroup . name } ` ;
123113 }
124- ) ;
114+
115+ const runAt =
116+ priority === "initial" ? options . runAt ?? new Date ( ) : run . startedAt ?? run . createdAt ;
117+
118+ await marqsv2 . enqueueMessage (
119+ run . version . environment ,
120+ queue ,
121+ run . id ,
122+ { runId : run . id , attempt : 1 } ,
123+ undefined ,
124+ runAt . getTime ( )
125+ ) ;
126+ } else {
127+ return await executionWorker . enqueue (
128+ "performRunExecutionV3" ,
129+ {
130+ id : run . id ,
131+ reason : "EXECUTE_JOB" ,
132+ } ,
133+ {
134+ tx,
135+ runAt : options . runAt ,
136+ jobKey : `job_run:EXECUTE_JOB:${ run . id } ` ,
137+ maxAttempts : options . skipRetrying ? env . DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS : undefined ,
138+ flags : executionRateLimiter ?. flagsForRun ( run , run . version ) ?? [ ] ,
139+ priority : priority === "initial" ? 0 : - 1 ,
140+ }
141+ ) ;
142+ }
125143 }
126144
127145 static async dequeue ( run : JobRun , tx : PrismaClientOrTransaction ) {
128146 await executionWorker . dequeue ( `job_run:EXECUTE_JOB:${ run . id } ` , {
129147 tx,
130148 } ) ;
149+
150+ await marqsv2 ?. acknowledgeMessage ( run . id ) ;
131151 }
132152
133153 async #executeJob( run : FoundRun , input : PerformRunExecutionV3Input , driftInMs : number = 0 ) {
@@ -254,6 +274,10 @@ export class PerformRunExecutionV3Service {
254274
255275 forceYieldCoordinator . deregisterRun ( run . id ) ;
256276
277+ if ( marqsv2 && run . organization . v2MarqsEnabled ) {
278+ await marqsv2 . acknowledgeMessage ( run . id ) ;
279+ }
280+
257281 //if the run has been canceled while it's being executed, we shouldn't do anything more
258282 const updatedRun = await this . #prismaClient. jobRun . findUnique ( {
259283 select : {
0 commit comments