|
| 1 | +import { describe, expect, vi } from "vitest"; |
| 2 | +import { setTimeout } from "node:timers/promises"; |
| 3 | +import { postgresTest, replicationContainerTest } from "@internal/testcontainers"; |
| 4 | +import { BulkActionType } from "@trigger.dev/database"; |
| 5 | +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; |
| 6 | +import { |
| 7 | + BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE, |
| 8 | + BillingLimitBulkCancelService, |
| 9 | +} from "~/v3/services/billingLimit/BillingLimitBulkCancelService.server"; |
| 10 | +import { countInProgressRunsForBillableEnvironment } from "~/v3/services/billingLimit/billingLimitQueuedRuns.server"; |
| 11 | +import { |
| 12 | + createRuntimeEnvironment, |
| 13 | + createTestOrgProjectWithMember, |
| 14 | + uniqueId, |
| 15 | +} from "./fixtures/environmentVariablesFixtures"; |
| 16 | +import { setupClickhouseReplication } from "./utils/replicationUtils"; |
| 17 | + |
| 18 | +vi.setConfig({ testTimeout: 60_000 }); |
| 19 | + |
| 20 | +describe("BillingLimitBulkCancelService.cancelInProgressRuns", () => { |
| 21 | + postgresTest("dedupes bulk cancel by hitAt per environment", async ({ prisma }) => { |
| 22 | + const { organization, project } = await createTestOrgProjectWithMember(prisma); |
| 23 | + const productionEnv = await createRuntimeEnvironment(prisma, { |
| 24 | + projectId: project.id, |
| 25 | + organizationId: organization.id, |
| 26 | + type: "PRODUCTION", |
| 27 | + slug: uniqueId("prod"), |
| 28 | + }); |
| 29 | + |
| 30 | + const hitAt = "2026-06-16T12:00:00.000Z"; |
| 31 | + |
| 32 | + await prisma.bulkActionGroup.create({ |
| 33 | + data: { |
| 34 | + id: "bulk_existing", |
| 35 | + friendlyId: "bulk_existing", |
| 36 | + projectId: project.id, |
| 37 | + environmentId: productionEnv.id, |
| 38 | + name: "Existing in-progress cancel", |
| 39 | + type: BulkActionType.CANCEL, |
| 40 | + params: { |
| 41 | + statuses: ["EXECUTING"], |
| 42 | + finalizeRun: true, |
| 43 | + source: BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE, |
| 44 | + dedupeKey: hitAt, |
| 45 | + }, |
| 46 | + queryName: "bulk_action_v1", |
| 47 | + totalCount: 1, |
| 48 | + }, |
| 49 | + }); |
| 50 | + |
| 51 | + const result = await BillingLimitBulkCancelService.cancelInProgressRuns( |
| 52 | + organization.id, |
| 53 | + { hitAt }, |
| 54 | + { prismaClient: prisma, enqueueProcessBulkAction: async () => undefined } |
| 55 | + ); |
| 56 | + |
| 57 | + expect(result.bulkActionIds).toEqual(["bulk_existing"]); |
| 58 | + |
| 59 | + const groups = await prisma.bulkActionGroup.findMany({ |
| 60 | + where: { environmentId: productionEnv.id, type: BulkActionType.CANCEL }, |
| 61 | + }); |
| 62 | + |
| 63 | + expect(groups).toHaveLength(1); |
| 64 | + }); |
| 65 | + |
| 66 | + replicationContainerTest( |
| 67 | + "creates bulk cancel for in-progress runs in billable environments", |
| 68 | + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { |
| 69 | + const { clickhouse } = await setupClickhouseReplication({ |
| 70 | + prisma, |
| 71 | + databaseUrl: postgresContainer.getConnectionUri(), |
| 72 | + clickhouseUrl: clickhouseContainer.getConnectionUrl(), |
| 73 | + redisOptions, |
| 74 | + }); |
| 75 | + |
| 76 | + const organization = await prisma.organization.create({ |
| 77 | + data: { title: "billing-limit-in-progress-runs", slug: "billing-limit-in-progress-runs" }, |
| 78 | + }); |
| 79 | + |
| 80 | + const project = await prisma.project.create({ |
| 81 | + data: { |
| 82 | + name: "billing-limit-in-progress-runs", |
| 83 | + slug: "billing-limit-in-progress-runs", |
| 84 | + organizationId: organization.id, |
| 85 | + externalRef: "billing-limit-in-progress-runs", |
| 86 | + }, |
| 87 | + }); |
| 88 | + |
| 89 | + const productionEnv = await prisma.runtimeEnvironment.create({ |
| 90 | + data: { |
| 91 | + slug: "prod", |
| 92 | + type: "PRODUCTION", |
| 93 | + projectId: project.id, |
| 94 | + organizationId: organization.id, |
| 95 | + apiKey: "prod", |
| 96 | + pkApiKey: "prod", |
| 97 | + shortcode: "prod", |
| 98 | + }, |
| 99 | + }); |
| 100 | + |
| 101 | + await prisma.taskRun.create({ |
| 102 | + data: { |
| 103 | + friendlyId: "run_executing_prod", |
| 104 | + taskIdentifier: "running-task", |
| 105 | + status: "EXECUTING", |
| 106 | + payload: JSON.stringify({}), |
| 107 | + traceId: "trace", |
| 108 | + spanId: "span", |
| 109 | + queue: "main", |
| 110 | + runtimeEnvironmentId: productionEnv.id, |
| 111 | + projectId: project.id, |
| 112 | + organizationId: organization.id, |
| 113 | + environmentType: "PRODUCTION", |
| 114 | + engine: "V2", |
| 115 | + }, |
| 116 | + }); |
| 117 | + |
| 118 | + await setTimeout(1000); |
| 119 | + |
| 120 | + const runsRepository = new RunsRepository({ prisma, clickhouse }); |
| 121 | + |
| 122 | + const count = await countInProgressRunsForBillableEnvironment( |
| 123 | + runsRepository, |
| 124 | + organization.id, |
| 125 | + { id: productionEnv.id, projectId: project.id } |
| 126 | + ); |
| 127 | + |
| 128 | + expect(count).toBe(1); |
| 129 | + |
| 130 | + const hitAt = "2026-06-16T12:00:00.000Z"; |
| 131 | + const enqueuedBulkActionIds: string[] = []; |
| 132 | + |
| 133 | + const result = await BillingLimitBulkCancelService.cancelInProgressRuns( |
| 134 | + organization.id, |
| 135 | + { hitAt }, |
| 136 | + { |
| 137 | + prismaClient: prisma, |
| 138 | + createRunsRepository: async () => runsRepository, |
| 139 | + enqueueProcessBulkAction: async (bulkActionId) => { |
| 140 | + enqueuedBulkActionIds.push(bulkActionId); |
| 141 | + }, |
| 142 | + } |
| 143 | + ); |
| 144 | + |
| 145 | + expect(result.bulkActionIds).toHaveLength(1); |
| 146 | + expect(enqueuedBulkActionIds).toHaveLength(1); |
| 147 | + |
| 148 | + const group = await prisma.bulkActionGroup.findFirst({ |
| 149 | + where: { |
| 150 | + environmentId: productionEnv.id, |
| 151 | + type: BulkActionType.CANCEL, |
| 152 | + }, |
| 153 | + }); |
| 154 | + |
| 155 | + expect(group?.name).toBe("Billing limit hit — cancel in-progress runs"); |
| 156 | + expect(group?.params).toMatchObject({ |
| 157 | + source: BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE, |
| 158 | + dedupeKey: hitAt, |
| 159 | + finalizeRun: true, |
| 160 | + }); |
| 161 | + } |
| 162 | + ); |
| 163 | +}); |
0 commit comments