Skip to content

Commit 74db2de

Browse files
committed
Use graphile strategy 0 (no named queues) and remove all named queues
1 parent 93acca6 commit 74db2de

8 files changed

Lines changed: 30 additions & 31 deletions

File tree

apps/webapp/app/services/sources/handleHttpSource.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ export class HandleHttpSourceService {
6161
id: delivery.id,
6262
},
6363
{
64-
queueName: `deliver:${triggerSource.id}`,
6564
tx,
6665
maxAttempts:
6766
triggerSource.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined,

apps/webapp/app/services/worker.server.ts

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,21 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { env } from "~/env.server";
66
import { ZodWorker } from "~/platform/zodWorker.server";
7+
import { eventRepository } from "~/v3/eventRepository.server";
8+
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
9+
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
10+
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
11+
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
12+
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
13+
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
714
import { IndexDeploymentService } from "~/v3/services/indexDeployment.server";
15+
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
16+
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
17+
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
18+
import { RetryAttemptService } from "~/v3/services/retryAttempt.server";
19+
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
20+
import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.server";
21+
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";
822
import { ExpireDispatcherService } from "./dispatchers/expireDispatcher.server";
923
import { InvokeEphemeralDispatcherService } from "./dispatchers/invokeEphemeralEventDispatcher.server";
1024
import { sendEmail } from "./email.server";
@@ -30,22 +44,6 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
3044
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
3145
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
3246
import { ResumeTaskService } from "./tasks/resumeTask.server";
33-
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
34-
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
35-
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
36-
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
37-
import { eventRepository } from "~/v3/eventRepository.server";
38-
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
39-
import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.server";
40-
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
41-
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
42-
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
43-
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";
44-
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
45-
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
46-
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
47-
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
48-
import { RetryAttemptService } from "~/v3/services/retryAttempt.server";
4947

5048
const workerCatalog = {
5149
indexEndpoint: z.object({
@@ -363,7 +361,6 @@ function getWorkerQueue() {
363361
deliverHttpSourceRequest: {
364362
priority: 0, // smaller number = higher priority
365363
maxAttempts: 14,
366-
queueName: (payload) => `sources:${payload.id}`,
367364
handler: async (payload, job) => {
368365
const service = new DeliverHttpSourceRequestService();
369366

@@ -373,7 +370,6 @@ function getWorkerQueue() {
373370
deliverWebhookRequest: {
374371
priority: 0, // smaller number = higher priority
375372
maxAttempts: 14,
376-
queueName: (payload) => `webhooks:${payload.id}`,
377373
handler: async (payload, job) => {
378374
const service = new DeliverWebhookRequestService();
379375

apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ export class PerformDeploymentAlertsService extends BaseService {
5858
},
5959
});
6060

61-
await DeliverAlertService.enqueue(alert.id, tx, {
62-
queueName: `alert-channel:${alertChannel.id}`,
63-
});
61+
await DeliverAlertService.enqueue(alert.id, tx);
6462
});
6563
}
6664

apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ export class PerformTaskAttemptAlertsService extends BaseService {
5959
},
6060
});
6161

62-
await DeliverAlertService.enqueue(alert.id, tx, {
63-
queueName: `alert-channel:${alertChannel.id}`,
64-
});
62+
await DeliverAlertService.enqueue(alert.id, tx);
6563
});
6664
}
6765

apps/webapp/app/v3/services/bulk/performBulkAction.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ export class PerformBulkActionService extends BaseService {
8888
},
8989
{
9090
jobKey: `performBulkActionItem:${bulkActionItemId}`,
91-
queueName: `bulkActionItem:${groupId}`,
9291
}
9392
);
9493
}

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ export class ResumeBatchRunService extends BaseService {
107107
{
108108
tx,
109109
runAt,
110-
queueName: `resumeBatchRun-${batchRunId}`,
110+
jobKey: `resumeBatchRun-${batchRunId}`,
111111
}
112112
);
113113
}

patches/graphile-worker@0.16.6.patch

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
diff --git a/dist/sql/getJob.js b/dist/sql/getJob.js
2-
index 70cc9b49d7d08c8dd32214f15c463b2a568abd15..bb3fe15317f139edd2b24c5046512813ae88f36e 100644
2+
index 70cc9b49d7d08c8dd32214f15c463b2a568abd15..5bcb50a4544046e56f6d5dd70e96e26e59998ad5 100644
33
--- a/dist/sql/getJob.js
44
+++ b/dist/sql/getJob.js
5+
@@ -61,7 +61,7 @@ async function getJob(compiledSharedOptions, withPgClient, tasks, workerId, flag
6+
*
7+
* I recommend you either use strat 0 if you can, or strat 2 otherwise.
8+
*/
9+
- const strategy = 2;
10+
+ const strategy = 0;
11+
const queueClause = strategy === 0
12+
? `and jobs.job_queue_id is null`
13+
: strategy === 1
514
@@ -153,6 +153,13 @@ with j as (
615
const name = !preparedStatements
716
? undefined

pnpm-lock.yaml

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)