@@ -4,12 +4,13 @@ import {
44 TaskRunExecution ,
55 TaskRunExecutionResult ,
66} from "@trigger.dev/core/v3" ;
7- import { $transaction } from "~/db.server" ;
7+ import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
88import { logger } from "~/services/logger.server" ;
99import { marqs } from "~/v3/marqs/index.server" ;
1010import { socketIo } from "../handleSocketIo.server" ;
1111import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server" ;
1212import { BaseService } from "./baseService.server" ;
13+ import { TaskRunAttempt } from "@trigger.dev/database" ;
1314
1415export class ResumeAttemptService extends BaseService {
1516 public async call (
@@ -24,7 +25,7 @@ export class ResumeAttemptService extends BaseService {
2425 } ,
2526 include : {
2627 taskRun : true ,
27- taskRunDependency : {
28+ dependencies : {
2829 include : {
2930 taskRun : {
3031 include : {
@@ -40,8 +41,12 @@ export class ResumeAttemptService extends BaseService {
4041 } ,
4142 } ,
4243 } ,
44+ orderBy : {
45+ createdAt : "desc" ,
46+ } ,
47+ take : 1 ,
4348 } ,
44- batchTaskRunDependency : {
49+ batchDependencies : {
4550 include : {
4651 items : {
4752 include : {
@@ -61,6 +66,10 @@ export class ResumeAttemptService extends BaseService {
6166 } ,
6267 } ,
6368 } ,
69+ orderBy : {
70+ createdAt : "desc" ,
71+ } ,
72+ take : 1 ,
6473 } ,
6574 } ,
6675 } ) ;
@@ -78,6 +87,8 @@ export class ResumeAttemptService extends BaseService {
7887 return ;
7988 }
8089
90+ let completedAttemptIds : string [ ] = [ ] ;
91+
8192 switch ( params . type ) {
8293 case "WAIT_FOR_DURATION" : {
8394 logger . error (
@@ -93,148 +104,140 @@ export class ResumeAttemptService extends BaseService {
93104 } ) ;
94105 break ;
95106 }
96- case "WAIT_FOR_TASK" :
97- case "WAIT_FOR_BATCH" : {
98- let completedAttemptIds : string [ ] = [ ] ;
99-
100- if ( attempt . taskRunDependency ) {
101- const dependentAttempt = attempt . taskRunDependency . taskRun . attempts [ 0 ] ;
107+ case "WAIT_FOR_TASK" : {
108+ if ( attempt . dependencies . length ) {
109+ // We only care about the latest dependency
110+ const dependentAttempt = attempt . dependencies [ 0 ] . taskRun . attempts [ 0 ] ;
102111
103112 if ( ! dependentAttempt ) {
104113 logger . error ( "No dependent attempt" , { attemptId : attempt . id } ) ;
105114 return ;
106115 }
107116
108117 completedAttemptIds = [ dependentAttempt . id ] ;
109-
110- await tx . taskRunAttempt . update ( {
111- where : {
112- id : attempt . id ,
113- } ,
114- data : {
115- taskRunDependency : {
116- disconnect : true ,
117- } ,
118- } ,
119- } ) ;
120- } else if ( attempt . batchTaskRunDependency ) {
121- const dependentBatchItems = attempt . batchTaskRunDependency . items ;
118+ } else {
119+ logger . error ( "No task dependency" , { attemptId : attempt . id } ) ;
120+ return ;
121+ }
122+ break ;
123+ }
124+ case "WAIT_FOR_BATCH" : {
125+ if ( attempt . batchDependencies ) {
126+ // We only care about the latest batch dependency
127+ const dependentBatchItems = attempt . batchDependencies [ 0 ] . items ;
122128
123129 if ( ! dependentBatchItems ) {
124130 logger . error ( "No dependent batch items" , { attemptId : attempt . id } ) ;
125131 return ;
126132 }
127133
128134 completedAttemptIds = dependentBatchItems . map ( ( item ) => item . taskRun . attempts [ 0 ] ?. id ) ;
129-
130- await tx . taskRunAttempt . update ( {
131- where : {
132- id : attempt . id ,
133- } ,
134- data : {
135- batchTaskRunDependency : {
136- disconnect : true ,
137- } ,
138- } ,
139- } ) ;
140135 } else {
141- logger . error ( "No dependencies" , { attemptId : attempt . id } ) ;
142- return ;
143- }
144-
145- if ( completedAttemptIds . length === 0 ) {
146- logger . error ( "No completed attempt IDs" , { attemptId : attempt . id } ) ;
136+ logger . error ( "No batch dependency" , { attemptId : attempt . id } ) ;
147137 return ;
148138 }
139+ break ;
140+ }
141+ default : {
142+ break ;
143+ }
144+ }
149145
150- const completions : TaskRunExecutionResult [ ] = [ ] ;
151- const executions : TaskRunExecution [ ] = [ ] ;
146+ await this . #handleDependencyResume( attempt , completedAttemptIds , tx ) ;
147+ } ) ;
148+ }
152149
153- for ( const completedAttemptId of completedAttemptIds ) {
154- const completedAttempt = await tx . taskRunAttempt . findUnique ( {
155- where : {
156- id : completedAttemptId ,
157- taskRun : {
158- lockedAt : {
159- not : null ,
160- } ,
161- lockedById : {
162- not : null ,
163- } ,
164- } ,
165- } ,
166- } ) ;
167-
168- if ( ! completedAttempt ) {
169- logger . error ( "Completed attempt not found" , {
170- attemptId : attempt . id ,
171- completedAttemptId,
172- } ) ;
173- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
174- return ;
175- }
150+ async #handleDependencyResume(
151+ attempt : TaskRunAttempt ,
152+ completedAttemptIds : string [ ] ,
153+ tx : PrismaClientOrTransaction
154+ ) {
155+ if ( completedAttemptIds . length === 0 ) {
156+ logger . error ( "No completed attempt IDs" , { attemptId : attempt . id } ) ;
157+ return ;
158+ }
159+
160+ const completions : TaskRunExecutionResult [ ] = [ ] ;
161+ const executions : TaskRunExecution [ ] = [ ] ;
162+
163+ for ( const completedAttemptId of completedAttemptIds ) {
164+ const completedAttempt = await tx . taskRunAttempt . findUnique ( {
165+ where : {
166+ id : completedAttemptId ,
167+ taskRun : {
168+ lockedAt : {
169+ not : null ,
170+ } ,
171+ lockedById : {
172+ not : null ,
173+ } ,
174+ } ,
175+ } ,
176+ } ) ;
176177
177- const completion = await sharedQueueTasks . getCompletionPayloadFromAttempt (
178- completedAttempt . id
179- ) ;
178+ if ( ! completedAttempt ) {
179+ logger . error ( "Completed attempt not found" , {
180+ attemptId : attempt . id ,
181+ completedAttemptId,
182+ } ) ;
183+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
184+ return ;
185+ }
180186
181- if ( ! completion ) {
182- logger . error ( "Failed to get completion payload" , {
183- attemptId : attempt . id ,
184- completedAttemptId,
185- } ) ;
186- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
187- return ;
188- }
187+ const completion = await sharedQueueTasks . getCompletionPayloadFromAttempt (
188+ completedAttempt . id
189+ ) ;
189190
190- completions . push ( completion ) ;
191+ if ( ! completion ) {
192+ logger . error ( "Failed to get completion payload" , {
193+ attemptId : attempt . id ,
194+ completedAttemptId,
195+ } ) ;
196+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
197+ return ;
198+ }
191199
192- const executionPayload = await sharedQueueTasks . getExecutionPayloadFromAttempt (
193- completedAttempt . id
194- ) ;
200+ completions . push ( completion ) ;
195201
196- if ( ! executionPayload ) {
197- logger . error ( "Failed to get execution payload" , {
198- attemptId : attempt . id ,
199- completedAttemptId,
200- } ) ;
201- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
202- return ;
203- }
202+ const executionPayload = await sharedQueueTasks . getExecutionPayloadFromAttempt (
203+ completedAttempt . id
204+ ) ;
204205
205- executions . push ( executionPayload . execution ) ;
206- }
206+ if ( ! executionPayload ) {
207+ logger . error ( "Failed to get execution payload" , {
208+ attemptId : attempt . id ,
209+ completedAttemptId,
210+ } ) ;
211+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
212+ return ;
213+ }
207214
208- const updated = await tx . taskRunAttempt . update ( {
209- where : {
210- id : attempt . id ,
211- } ,
215+ executions . push ( executionPayload . execution ) ;
216+ }
217+
218+ const updated = await tx . taskRunAttempt . update ( {
219+ where : {
220+ id : attempt . id ,
221+ } ,
222+ data : {
223+ status : "EXECUTING" ,
224+ taskRun : {
225+ update : {
212226 data : {
213- status : "EXECUTING" ,
214- taskRun : {
215- update : {
216- data : {
217- status : attempt . number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING" ,
218- } ,
219- } ,
220- } ,
227+ status : attempt . number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING" ,
221228 } ,
222- } ) ;
229+ } ,
230+ } ,
231+ } ,
232+ } ) ;
223233
224- socketIo . coordinatorNamespace . emit ( "RESUME_AFTER_DEPENDENCY" , {
225- version : "v1" ,
226- runId : attempt . taskRunId ,
227- attemptId : attempt . id ,
228- attemptFriendlyId : attempt . friendlyId ,
229- completions,
230- executions,
231- } ) ;
232- break ;
233- }
234- default : {
235- break ;
236- }
237- }
234+ socketIo . coordinatorNamespace . emit ( "RESUME_AFTER_DEPENDENCY" , {
235+ version : "v1" ,
236+ runId : attempt . taskRunId ,
237+ attemptId : attempt . id ,
238+ attemptFriendlyId : attempt . friendlyId ,
239+ completions,
240+ executions,
238241 } ) ;
239242 }
240243}
0 commit comments