@@ -8,9 +8,90 @@ import { PrismaClientOrTransaction } from "~/db.server";
88import { workerQueue } from "~/services/worker.server" ;
99
1010export class RequeueTaskRunService extends BaseService {
11- public async call ( runId : string ) { }
11+ public async call ( runId : string ) {
12+ const taskRun = await this . _prisma . taskRun . findUnique ( {
13+ where : { id : runId } ,
14+ } ) ;
1215
13- public static async enqueue ( runId : string , runAt ?: Date , tx ?: PrismaClientOrTransaction ) { }
16+ if ( ! taskRun ) {
17+ logger . error ( "[RequeueTaskRunService] Task run not found" , {
18+ runId,
19+ } ) ;
1420
15- public static async dequeue ( runId : string , tx ?: PrismaClientOrTransaction ) { }
21+ return ;
22+ }
23+
24+ switch ( taskRun . status ) {
25+ case "PENDING" : {
26+ logger . debug ( "[RequeueTaskRunService] Requeueing task run" , { taskRun } ) ;
27+
28+ await marqs ?. nackMessage ( taskRun . id ) ;
29+
30+ break ;
31+ }
32+ case "EXECUTING" :
33+ case "RETRYING_AFTER_FAILURE" : {
34+ logger . debug ( "[RequeueTaskRunService] Failing task run" , { taskRun } ) ;
35+
36+ const service = new FailedTaskRunService ( ) ;
37+
38+ await service . call ( taskRun . friendlyId , {
39+ ok : false ,
40+ id : taskRun . friendlyId ,
41+ retry : undefined ,
42+ error : {
43+ type : "INTERNAL_ERROR" ,
44+ code : "TASK_RUN_HEARTBEAT_TIMEOUT" ,
45+ message : "Did not receive a heartbeat from the worker in time" ,
46+ } ,
47+ } ) ;
48+
49+ break ;
50+ }
51+ case "DELAYED" :
52+ case "WAITING_FOR_DEPLOY" : {
53+ logger . debug ( "[RequeueTaskRunService] Removing task run from queue" , { taskRun } ) ;
54+
55+ await marqs ?. acknowledgeMessage ( taskRun . id ) ;
56+
57+ break ;
58+ }
59+ case "WAITING_TO_RESUME" :
60+ case "PAUSED" : {
61+ logger . debug ( "[RequeueTaskRunService] Requeueing task run" , { taskRun } ) ;
62+
63+ await marqs ?. nackMessage ( taskRun . id ) ;
64+
65+ break ;
66+ }
67+ case "SYSTEM_FAILURE" :
68+ case "INTERRUPTED" :
69+ case "CRASHED" :
70+ case "COMPLETED_WITH_ERRORS" :
71+ case "COMPLETED_SUCCESSFULLY" :
72+ case "EXPIRED" :
73+ case "CANCELED" : {
74+ logger . debug ( "[RequeueTaskRunService] Task run is completed" , { taskRun } ) ;
75+
76+ await marqs ?. acknowledgeMessage ( taskRun . id ) ;
77+
78+ break ;
79+ }
80+ default : {
81+ assertNever ( taskRun . status ) ;
82+ }
83+ }
84+ }
85+
86+ public static async enqueue ( runId : string , runAt ?: Date , tx ?: PrismaClientOrTransaction ) {
87+ return await workerQueue . enqueue (
88+ "v3.requeueTaskRun" ,
89+ { runId } ,
90+ { runAt, jobKey : `requeueTaskRun:${ runId } ` }
91+ ) ;
92+ }
93+
94+ public static async dequeue ( runId : string , tx ?: PrismaClientOrTransaction ) {
95+ return await workerQueue . dequeue ( `requeueTaskRun:${ runId } ` , { tx } ) ;
96+ }
1697}
0 commit comments