Skip to content

Commit 5d77732

Browse files
committed
feat(webapp): cancel in-progress runs when a billing limit is hit
Optionally cancel in-progress runs on limit hit via a deduplicated bulk-cancel job.
1 parent 408b9f7 commit 5d77732

4 files changed

Lines changed: 277 additions & 0 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { BillingLimitBulkCancelService } from "./BillingLimitBulkCancelService.server";
2+
3+
export async function runBillingLimitCancelInProgressRuns(
4+
organizationId: string,
5+
hitAt: string
6+
): Promise<{ bulkActionIds: string[] }> {
7+
return BillingLimitBulkCancelService.cancelInProgressRuns(organizationId, { hitAt });
8+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
export type BillingLimitHitPayload = {
2+
organizationId: string;
3+
hitAt: string;
4+
cancelInProgressRuns: boolean;
5+
};
6+
7+
export type BillingLimitHitDeps = {
8+
bustCaches: (organizationId: string) => void;
9+
seedReconcileQueue: (organizationId: string) => Promise<void>;
10+
enqueueConverge: (organizationId: string, targetState: "grace") => Promise<unknown>;
11+
enqueueCancelInProgressRuns: (organizationId: string, hitAt: string) => Promise<unknown>;
12+
};
13+
14+
/** Process billing limit grace hit from the billing platform webhook. */
15+
export async function processBillingLimitHit(
16+
payload: BillingLimitHitPayload,
17+
deps: BillingLimitHitDeps
18+
): Promise<void> {
19+
deps.bustCaches(payload.organizationId);
20+
await deps.seedReconcileQueue(payload.organizationId);
21+
await deps.enqueueConverge(payload.organizationId, "grace");
22+
23+
if (payload.cancelInProgressRuns) {
24+
await deps.enqueueCancelInProgressRuns(payload.organizationId, payload.hitAt);
25+
}
26+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import { describe, expect, vi } from "vitest";
2+
import { setTimeout } from "node:timers/promises";
3+
import { postgresTest, replicationContainerTest } from "@internal/testcontainers";
4+
import { BulkActionType } from "@trigger.dev/database";
5+
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
6+
import {
7+
BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE,
8+
BillingLimitBulkCancelService,
9+
} from "~/v3/services/billingLimit/BillingLimitBulkCancelService.server";
10+
import { countInProgressRunsForBillableEnvironment } from "~/v3/services/billingLimit/billingLimitQueuedRuns.server";
11+
import {
12+
createRuntimeEnvironment,
13+
createTestOrgProjectWithMember,
14+
uniqueId,
15+
} from "./fixtures/environmentVariablesFixtures";
16+
import { setupClickhouseReplication } from "./utils/replicationUtils";
17+
18+
vi.setConfig({ testTimeout: 60_000 });
19+
20+
describe("BillingLimitBulkCancelService.cancelInProgressRuns", () => {
21+
postgresTest("dedupes bulk cancel by hitAt per environment", async ({ prisma }) => {
22+
const { organization, project } = await createTestOrgProjectWithMember(prisma);
23+
const productionEnv = await createRuntimeEnvironment(prisma, {
24+
projectId: project.id,
25+
organizationId: organization.id,
26+
type: "PRODUCTION",
27+
slug: uniqueId("prod"),
28+
});
29+
30+
const hitAt = "2026-06-16T12:00:00.000Z";
31+
32+
await prisma.bulkActionGroup.create({
33+
data: {
34+
id: "bulk_existing",
35+
friendlyId: "bulk_existing",
36+
projectId: project.id,
37+
environmentId: productionEnv.id,
38+
name: "Existing in-progress cancel",
39+
type: BulkActionType.CANCEL,
40+
params: {
41+
statuses: ["EXECUTING"],
42+
finalizeRun: true,
43+
source: BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE,
44+
dedupeKey: hitAt,
45+
},
46+
queryName: "bulk_action_v1",
47+
totalCount: 1,
48+
},
49+
});
50+
51+
const result = await BillingLimitBulkCancelService.cancelInProgressRuns(
52+
organization.id,
53+
{ hitAt },
54+
{ prismaClient: prisma, enqueueProcessBulkAction: async () => undefined }
55+
);
56+
57+
expect(result.bulkActionIds).toEqual(["bulk_existing"]);
58+
59+
const groups = await prisma.bulkActionGroup.findMany({
60+
where: { environmentId: productionEnv.id, type: BulkActionType.CANCEL },
61+
});
62+
63+
expect(groups).toHaveLength(1);
64+
});
65+
66+
replicationContainerTest(
67+
"creates bulk cancel for in-progress runs in billable environments",
68+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
69+
const { clickhouse } = await setupClickhouseReplication({
70+
prisma,
71+
databaseUrl: postgresContainer.getConnectionUri(),
72+
clickhouseUrl: clickhouseContainer.getConnectionUrl(),
73+
redisOptions,
74+
});
75+
76+
const organization = await prisma.organization.create({
77+
data: { title: "billing-limit-in-progress-runs", slug: "billing-limit-in-progress-runs" },
78+
});
79+
80+
const project = await prisma.project.create({
81+
data: {
82+
name: "billing-limit-in-progress-runs",
83+
slug: "billing-limit-in-progress-runs",
84+
organizationId: organization.id,
85+
externalRef: "billing-limit-in-progress-runs",
86+
},
87+
});
88+
89+
const productionEnv = await prisma.runtimeEnvironment.create({
90+
data: {
91+
slug: "prod",
92+
type: "PRODUCTION",
93+
projectId: project.id,
94+
organizationId: organization.id,
95+
apiKey: "prod",
96+
pkApiKey: "prod",
97+
shortcode: "prod",
98+
},
99+
});
100+
101+
await prisma.taskRun.create({
102+
data: {
103+
friendlyId: "run_executing_prod",
104+
taskIdentifier: "running-task",
105+
status: "EXECUTING",
106+
payload: JSON.stringify({}),
107+
traceId: "trace",
108+
spanId: "span",
109+
queue: "main",
110+
runtimeEnvironmentId: productionEnv.id,
111+
projectId: project.id,
112+
organizationId: organization.id,
113+
environmentType: "PRODUCTION",
114+
engine: "V2",
115+
},
116+
});
117+
118+
await setTimeout(1000);
119+
120+
const runsRepository = new RunsRepository({ prisma, clickhouse });
121+
122+
const count = await countInProgressRunsForBillableEnvironment(
123+
runsRepository,
124+
organization.id,
125+
{ id: productionEnv.id, projectId: project.id }
126+
);
127+
128+
expect(count).toBe(1);
129+
130+
const hitAt = "2026-06-16T12:00:00.000Z";
131+
const enqueuedBulkActionIds: string[] = [];
132+
133+
const result = await BillingLimitBulkCancelService.cancelInProgressRuns(
134+
organization.id,
135+
{ hitAt },
136+
{
137+
prismaClient: prisma,
138+
createRunsRepository: async () => runsRepository,
139+
enqueueProcessBulkAction: async (bulkActionId) => {
140+
enqueuedBulkActionIds.push(bulkActionId);
141+
},
142+
}
143+
);
144+
145+
expect(result.bulkActionIds).toHaveLength(1);
146+
expect(enqueuedBulkActionIds).toHaveLength(1);
147+
148+
const group = await prisma.bulkActionGroup.findFirst({
149+
where: {
150+
environmentId: productionEnv.id,
151+
type: BulkActionType.CANCEL,
152+
},
153+
});
154+
155+
expect(group?.name).toBe("Billing limit hit — cancel in-progress runs");
156+
expect(group?.params).toMatchObject({
157+
source: BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE,
158+
dedupeKey: hitAt,
159+
finalizeRun: true,
160+
});
161+
}
162+
);
163+
});
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { describe, expect, it } from "vitest";
2+
import { BillingLimitHitWebhookBodySchema } from "~/services/billingLimit.schemas";
3+
import {
4+
type BillingLimitHitDeps,
5+
processBillingLimitHit,
6+
} from "~/v3/services/billingLimit/billingLimitHit.server";
7+
8+
describe("billingLimitHit", () => {
9+
it("busts caches, seeds reconcile, and enqueues grace converge", async () => {
10+
const calls: string[] = [];
11+
12+
const deps: BillingLimitHitDeps = {
13+
bustCaches: (organizationId) => {
14+
calls.push(`bust:${organizationId}`);
15+
},
16+
seedReconcileQueue: async (organizationId) => {
17+
calls.push(`seed:${organizationId}`);
18+
},
19+
enqueueConverge: async (organizationId, targetState) => {
20+
calls.push(`converge:${organizationId}:${targetState}`);
21+
},
22+
enqueueCancelInProgressRuns: async () => {
23+
calls.push("cancel");
24+
},
25+
};
26+
27+
await processBillingLimitHit(
28+
{
29+
organizationId: "org_123",
30+
hitAt: "2026-06-16T12:00:00.000Z",
31+
cancelInProgressRuns: false,
32+
},
33+
deps
34+
);
35+
36+
expect(calls).toEqual(["bust:org_123", "seed:org_123", "converge:org_123:grace"]);
37+
});
38+
39+
it("enqueues in-progress cancel when cancelInProgressRuns is true", async () => {
40+
const cancelCalls: Array<{ organizationId: string; hitAt: string }> = [];
41+
42+
const deps: BillingLimitHitDeps = {
43+
bustCaches: () => {},
44+
seedReconcileQueue: async () => {},
45+
enqueueConverge: async () => {},
46+
enqueueCancelInProgressRuns: async (organizationId, hitAt) => {
47+
cancelCalls.push({ organizationId, hitAt });
48+
},
49+
};
50+
51+
await processBillingLimitHit(
52+
{
53+
organizationId: "org_123",
54+
hitAt: "2026-06-16T12:00:00.000Z",
55+
cancelInProgressRuns: true,
56+
},
57+
deps
58+
);
59+
60+
expect(cancelCalls).toEqual([
61+
{ organizationId: "org_123", hitAt: "2026-06-16T12:00:00.000Z" },
62+
]);
63+
});
64+
});
65+
66+
describe("BillingLimitHitWebhookBodySchema", () => {
67+
it("parses the hit webhook body", () => {
68+
expect(
69+
BillingLimitHitWebhookBodySchema.parse({
70+
hitAt: "2026-06-16T12:00:00.000Z",
71+
cancelInProgressRuns: true,
72+
limitState: "grace",
73+
})
74+
).toEqual({
75+
hitAt: "2026-06-16T12:00:00.000Z",
76+
cancelInProgressRuns: true,
77+
limitState: "grace",
78+
});
79+
});
80+
});

0 commit comments

Comments
 (0)