From a03dab7a9251921be5a63a0b4c3aeb5da99b0dc2 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 22 Jun 2026 17:26:45 +0100 Subject: [PATCH 1/2] fix(run-engine,webapp): abort unsealed batches so blocked parents resume batchTriggerAndWait blocks the parent on a waitpoint as soon as the batch is created, but the batch is only sealed once all its items finish streaming. If the item stream fails completely (rate limit, request timeout, crash), the batch stayed PENDING with no runs and the parent waited on its waitpoint forever. A seal-timeout reaper, scheduled when the batch is created, now aborts a batch that is still unsealed after BATCH_SEAL_TIMEOUT_MS (default 30 minutes) and completes the parent's waitpoint with an error so it resumes. The reaper is idempotent and no-ops if the stream sealed the batch in the meantime. --- ...recover-stranded-batch-trigger-and-wait.md | 6 + apps/webapp/app/env.server.ts | 7 + .../runEngine/services/createBatch.server.ts | 9 + .../run-engine/src/engine/index.ts | 26 ++ .../src/engine/systems/batchSystem.ts | 93 +++++++ .../src/engine/tests/batchTwoPhase.test.ts | 258 ++++++++++++++++++ .../run-engine/src/engine/workerCatalog.ts | 6 + 7 files changed, 405 insertions(+) create mode 100644 .server-changes/recover-stranded-batch-trigger-and-wait.md diff --git a/.server-changes/recover-stranded-batch-trigger-and-wait.md b/.server-changes/recover-stranded-batch-trigger-and-wait.md new file mode 100644 index 00000000000..91e42e00676 --- /dev/null +++ b/.server-changes/recover-stranded-batch-trigger-and-wait.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Recover `batchTriggerAndWait` parents that previously hung forever when a batch's item stream never completed. Batches left unsealed past a timeout are now aborted and the waiting parent resumes with an error instead of waiting indefinitely. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dcb4eb11dba..cf474358467 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -768,6 +768,13 @@ const EnvironmentSchema = z // in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE, // so raise with care. Set to 1 for fully sequential ingestion. STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10), + // Seal-timeout reaper: if a batch's Phase 2 item stream never seals the batch + // (rate-limit, request timeout, crash), abort it after this delay and resume any + // blocked parent (batchTriggerAndWait) with an error instead of hanging forever. + // Must exceed the worst-case legitimate time-to-seal: the SDK retries the stream + // up to maxAttempts (5) times, each attempt bounded by the server request timeout + // (~300s), so the floor is ~5 × requestTimeout. Default 30m leaves headroom. + BATCH_SEAL_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000), BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100), BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200), BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts index 0653e1ef1c2..7e8900f2a74 100644 --- a/apps/webapp/app/runEngine/services/createBatch.server.ts +++ b/apps/webapp/app/runEngine/services/createBatch.server.ts @@ -4,6 +4,7 @@ import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic"; import { type BatchTaskRun, Prisma } from "@trigger.dev/database"; import { Evt } from "evt"; import { prisma, type PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; @@ -149,6 +150,14 @@ export class CreateBatchService extends WithRunEngine { await this._engine.initializeBatch(initOptions); + // Guard the 2-phase gap: if Phase 2 never seals this batch, the reaper + // aborts it after the timeout and resumes any blocked parent with an + // error instead of leaving it suspended forever. + await this._engine.scheduleExpireBatch({ + batchId: batch.id, + availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS), + }); + logger.info("Batch created", { batchId: friendlyId, runCount: body.runCount, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a6a20b5b9fd..8673e31c398 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -263,6 +263,9 @@ export class RunEngine { tryCompleteBatch: async ({ payload }) => { await this.batchSystem.performCompleteBatch({ batchId: payload.batchId }); }, + expireBatch: async ({ payload }) => { + await this.batchSystem.expireBatch({ batchId: payload.batchId }); + }, continueRunIfUnblocked: async ({ payload }) => { await this.waitpointSystem.continueRunIfUnblocked({ runId: payload.runId, @@ -1640,6 +1643,29 @@ export class RunEngine { return this.batchSystem.scheduleCompleteBatch({ batchId }); } + /** + * Terminally fail a batch whose Phase 2 item stream never sealed, resolving the + * parent's batchTriggerAndWait waitpoint with an error so the parent resumes + * instead of hanging forever. + */ + async expireBatch({ batchId }: { batchId: string }): Promise { + return this.batchSystem.expireBatch({ batchId }); + } + + /** + * Schedule the seal-timeout reaper for a batch. If the batch hasn't sealed by + * `availableAt`, {@link expireBatch} terminally fails it and resumes the parent. + */ + async scheduleExpireBatch({ + batchId, + availableAt, + }: { + batchId: string; + availableAt: Date; + }): Promise { + return this.batchSystem.scheduleExpireBatch({ batchId, availableAt }); + } + // ============================================================================ // BatchQueue methods (DRR-based batch processing) // ============================================================================ diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index a3d44507a46..52b4f9f45b8 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -1,4 +1,5 @@ import { startSpan } from "@internal/tracing"; +import { TaskRunError } from "@trigger.dev/core/v3/schemas"; import { isFinalRunStatus } from "../statuses.js"; import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; @@ -32,6 +33,98 @@ export class BatchSystem { await this.#tryCompleteBatch({ batchId }); } + public async scheduleExpireBatch({ + batchId, + availableAt, + }: { + batchId: string; + availableAt: Date; + }): Promise { + await this.$.worker.enqueue({ + // Stable id dedupes repeated schedules for the same batch. + id: `expireBatch:${batchId}`, + job: "expireBatch", + payload: { batchId }, + availableAt, + }); + } + + /** + * Terminally fail a batch whose Phase 2 item stream never sealed it, and resolve + * the parent's batchTriggerAndWait waitpoint with an error so the parent resumes + * with a failure instead of hanging forever. + * + * Idempotent and race-safe: if the stream sealed the batch (or it otherwise + * progressed past an unsealed PENDING state) in the meantime, this is a no-op. + */ + public async expireBatch({ batchId }: { batchId: string }): Promise { + return startSpan(this.$.tracer, "expireBatch", async (span) => { + span.setAttribute("batchId", batchId); + + const batch = await this.$.prisma.batchTaskRun.findFirst({ + select: { status: true, sealed: true }, + where: { id: batchId }, + }); + + if (!batch) { + this.$.logger.debug("expireBatch: batch doesn't exist", { batchId }); + return; + } + + // The stream sealed the batch, or it already progressed — nothing to fail. + if (batch.sealed || batch.status !== "PENDING") { + this.$.logger.debug("expireBatch: batch already sealed or no longer PENDING", { + batchId, + status: batch.status, + sealed: batch.sealed, + }); + return; + } + + // Conditional update guards against racing a late seal — whichever loses no-ops. + const aborted = await this.$.prisma.batchTaskRun.updateMany({ + where: { id: batchId, sealed: false, status: "PENDING" }, + data: { + status: "ABORTED", + completedAt: new Date(), + processingCompletedAt: new Date(), + }, + }); + + if (aborted.count === 0) { + this.$.logger.debug("expireBatch: lost race to seal, no-op", { batchId }); + return; + } + + // Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to resolve. + const waitpoint = await this.$.prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId }, + }); + + if (!waitpoint) { + this.$.logger.debug("expireBatch: no waitpoint to resolve (fire-and-forget batch)", { + batchId, + }); + return; + } + + const error: TaskRunError = { + type: "STRING_ERROR", + raw: "Batch items could not be streamed before the batch timed out", + }; + + await this.waitpointSystem.completeWaitpoint({ + id: waitpoint.id, + output: { value: JSON.stringify(error), isError: true }, + }); + + this.$.logger.warn("expireBatch: aborted unsealed batch and resumed parent with error", { + batchId, + waitpointId: waitpoint.id, + }); + }); + } + /** * Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status. * This isn't used operationally, but it's used for the Batches dashboard page. diff --git a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts index 8471c07844b..ec876a83ba1 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts @@ -565,6 +565,264 @@ describe("RunEngine 2-Phase Batch API", () => { } ); + containerTest( + "2-phase batch: expireBatch aborts an unsealed batch and resumes the parent with an error", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Phase 2 never runs for this batch, so neither callback should fire. + engine.setBatchProcessItemCallback(async () => { + return { success: true, runId: "should-not-be-called" }; + }); + engine.setBatchCompletionCallback(async () => {}); + + try { + const parentTask = "parent-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]); + + // Phase 1: create the batch record (PENDING, unsealed) — mirrors CreateBatchService + const { id: batchId, friendlyId: batchFriendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId: batchFriendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + // Trigger and start the parent attempt + const parentRun = await engine.trigger( + { + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_parent", + spanId: "s_parent", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + + const initialExecutionData = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(initialExecutionData); + await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: initialExecutionData.snapshot.id, + }); + + // Phase 1 continued: block the parent on the batch and initialize its metadata + await engine.blockRunWithCreatedBatch({ + runId: parentRun.id, + batchId, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + await engine.initializeBatch({ + batchId, + friendlyId: batchFriendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + parentRunId: parentRun.id, + resumeParentOnCompletion: true, + }); + + const afterBlocked = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(afterBlocked); + expect(afterBlocked.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Phase 2 never seals the batch. The seal-timeout reaper fires. + await engine.expireBatch({ batchId }); + + // The batch is terminally failed... + const abortedBatch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(abortedBatch?.status).toBe("ABORTED"); + expect(abortedBatch?.completedAt).not.toBeNull(); + + // ...its waitpoint is completed with an error... + const waitpoint = await prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId }, + }); + assertNonNullable(waitpoint); + expect(waitpoint.status).toBe("COMPLETED"); + expect(waitpoint.outputIsError).toBe(true); + + // ...and the parent is unblocked and resumes instead of hanging forever. + await vi.waitFor( + async () => { + const waitpoints = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(waitpoints.length).toBe(0); + }, + { timeout: 15000 } + ); + + const parentAfter = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(parentAfter); + expect(parentAfter.snapshot.executionStatus).toBe("EXECUTING"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: a scheduled seal-timeout aborts an unsealed batch", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + engine.setBatchProcessItemCallback(async () => { + return { success: true, runId: "should-not-be-called" }; + }); + engine.setBatchCompletionCallback(async () => {}); + + try { + // Phase 1: create + initialize the batch, but never stream/seal it. + const { id: batchId, friendlyId: batchFriendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId: batchFriendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 3, + expectedCount: 3, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + await engine.initializeBatch({ + batchId, + friendlyId: batchFriendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 3, + }); + + // Schedule the seal-timeout to fire immediately. + await engine.scheduleExpireBatch({ batchId, availableAt: new Date() }); + + // The worker picks up the job and aborts the unsealed batch. + await vi.waitFor( + async () => { + const batchRecord = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batchRecord?.status).toBe("ABORTED"); + }, + { timeout: 15000 } + ); + } finally { + await engine.quit(); + } + } + ); + containerTest( "2-phase batch: error if batch not initialized", async ({ prisma, redisOptions }) => { diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index de54c1ece00..8f49d7d6068 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -58,6 +58,12 @@ export const workerCatalog = { }), visibilityTimeoutMs: 30_000, }, + expireBatch: { + schema: z.object({ + batchId: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, continueRunIfUnblocked: { schema: z.object({ runId: z.string(), From 79bd92e17eecf7568b966a843745557238cd4737 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 22 Jun 2026 18:05:35 +0100 Subject: [PATCH 2/2] fix(run-engine,webapp): harden the batch seal-timeout reaper Don't abort a batch whose items were all processed (processingCompletedAt set) before the seal landed; those runs resume the parent on their own. Keep expireBatch idempotent: an already-aborted batch still resolves its parent waitpoint, so a retry after a mid-run crash can't leave the parent blocked. Treat ABORTED as terminal in tryCompleteBatch so a straggler run finalizing can't flip the batch back to COMPLETED. Abort the batch if scheduling the reaper fails, so a blocked parent is never stranded with nothing to free it. --- .../runEngine/services/createBatch.server.ts | 21 +- .../src/engine/systems/batchSystem.ts | 69 ++++-- .../src/engine/tests/batchTwoPhase.test.ts | 216 ++++++++++++++++++ 3 files changed, 275 insertions(+), 31 deletions(-) diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts index 7e8900f2a74..1d7cfefa20a 100644 --- a/apps/webapp/app/runEngine/services/createBatch.server.ts +++ b/apps/webapp/app/runEngine/services/createBatch.server.ts @@ -150,13 +150,20 @@ export class CreateBatchService extends WithRunEngine { await this._engine.initializeBatch(initOptions); - // Guard the 2-phase gap: if Phase 2 never seals this batch, the reaper - // aborts it after the timeout and resumes any blocked parent with an - // error instead of leaving it suspended forever. - await this._engine.scheduleExpireBatch({ - batchId: batch.id, - availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS), - }); + // Guard the gap between creating the batch and sealing it: if the item + // stream never seals this batch, the reaper aborts it after the timeout + // and resumes any blocked parent with an error instead of leaving it + // suspended forever. If scheduling the reaper itself fails, abort the + // batch now so a blocked parent can't be stranded with nothing to free it. + try { + await this._engine.scheduleExpireBatch({ + batchId: batch.id, + availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS), + }); + } catch (scheduleError) { + await this._engine.expireBatch({ batchId: batch.id }); + throw scheduleError; + } logger.info("Batch created", { batchId: friendlyId, diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index 52b4f9f45b8..07b09d8a590 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -62,7 +62,7 @@ export class BatchSystem { span.setAttribute("batchId", batchId); const batch = await this.$.prisma.batchTaskRun.findFirst({ - select: { status: true, sealed: true }, + select: { status: true, sealed: true, processingCompletedAt: true }, where: { id: batchId }, }); @@ -71,40 +71,58 @@ export class BatchSystem { return; } - // The stream sealed the batch, or it already progressed — nothing to fail. - if (batch.sealed || batch.status !== "PENDING") { - this.$.logger.debug("expireBatch: batch already sealed or no longer PENDING", { + // The stream sealed the batch, so the normal completion path owns it. + if (batch.sealed) { + this.$.logger.debug("expireBatch: batch already sealed", { batchId }); + return; + } + + // Terminal states other than ABORTED are done. ABORTED falls through on + // purpose: a previous attempt may have aborted the batch but crashed before + // resolving the waitpoint, and completeWaitpoint is idempotent, so retrying + // can't leave the parent blocked. + if (batch.status !== "PENDING" && batch.status !== "ABORTED") { + this.$.logger.debug("expireBatch: batch in terminal non-aborted state", { batchId, status: batch.status, - sealed: batch.sealed, }); return; } - // Conditional update guards against racing a late seal — whichever loses no-ops. - const aborted = await this.$.prisma.batchTaskRun.updateMany({ - where: { id: batchId, sealed: false, status: "PENDING" }, - data: { - status: "ABORTED", - completedAt: new Date(), - processingCompletedAt: new Date(), - }, - }); - - if (aborted.count === 0) { - this.$.logger.debug("expireBatch: lost race to seal, no-op", { batchId }); + // The BatchQueue already processed every item (the completion callback set + // processingCompletedAt) even though the seal never landed — the runs exist + // and will resume the parent on their own. Aborting would fail a healthy batch. + if (batch.status === "PENDING" && batch.processingCompletedAt !== null) { + this.$.logger.debug("expireBatch: items already processed, not aborting", { batchId }); return; } - // Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to resolve. + if (batch.status === "PENDING") { + // Conditional update guards against racing a late seal or completion — + // whichever loses no-ops. + const aborted = await this.$.prisma.batchTaskRun.updateMany({ + where: { id: batchId, sealed: false, status: "PENDING", processingCompletedAt: null }, + data: { + status: "ABORTED", + completedAt: new Date(), + }, + }); + + if (aborted.count === 0) { + this.$.logger.debug("expireBatch: lost race to seal/complete, no-op", { batchId }); + return; + } + } + + // Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to + // resolve. The status filter keeps this idempotent if a previous attempt + // already resolved it. const waitpoint = await this.$.prisma.waitpoint.findFirst({ - where: { completedByBatchId: batchId }, + where: { completedByBatchId: batchId, status: "PENDING" }, }); if (!waitpoint) { - this.$.logger.debug("expireBatch: no waitpoint to resolve (fire-and-forget batch)", { - batchId, - }); + this.$.logger.debug("expireBatch: no pending waitpoint to resolve", { batchId }); return; } @@ -151,8 +169,11 @@ export class BatchSystem { return; } - if (batch.status === "COMPLETED") { - this.$.logger.debug("#tryCompleteBatch: Batch already completed", { batchId }); + if (batch.status === "COMPLETED" || batch.status === "ABORTED") { + this.$.logger.debug("#tryCompleteBatch: Batch already in a terminal status", { + batchId, + status: batch.status, + }); return; } diff --git a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts index ec876a83ba1..56249e43098 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts @@ -13,9 +13,53 @@ import type { BatchItem, InitializeBatchOptions, } from "../../batch-queue/types.js"; +import { type PrismaClient } from "@trigger.dev/database"; +import { type RedisOptions } from "ioredis"; vi.setConfig({ testTimeout: 60_000 }); +function createBatchTestEngine(prisma: PrismaClient, redisOptions: RedisOptions) { + return new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); +} + describe("RunEngine 2-Phase Batch API", () => { containerTest( "2-phase batch: initialize batch, stream items one by one, items get processed", @@ -823,6 +867,178 @@ describe("RunEngine 2-Phase Batch API", () => { } ); + containerTest( + "2-phase batch: expireBatch does not abort a batch whose items were all processed", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + // The BatchQueue streamed and processed every item (the completion callback + // set processingCompletedAt) but the seal never landed. The runs exist and + // will resume the parent on their own, so the reaper must NOT abort this. + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + processingCompletedAt: new Date(), + }, + }); + + await engine.expireBatch({ batchId }); + + const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batch?.status).toBe("PENDING"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: expireBatch resumes the parent even if a prior attempt already aborted the batch", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + const parentTask = "parent-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]); + + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + const parentRun = await engine.trigger( + { + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_parent", + spanId: "s_parent", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + + const initial = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(initial); + await engine.startRunAttempt({ runId: parentRun.id, snapshotId: initial.snapshot.id }); + + await engine.blockRunWithCreatedBatch({ + runId: parentRun.id, + batchId, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + // Simulate a prior expireBatch attempt that aborted the batch but crashed + // before resolving the parent's waitpoint. + await prisma.batchTaskRun.update({ + where: { id: batchId }, + data: { status: "ABORTED", completedAt: new Date() }, + }); + + // The retry must still resolve the parent's waitpoint so it can't stay stuck. + await engine.expireBatch({ batchId }); + + const waitpoint = await prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId }, + }); + assertNonNullable(waitpoint); + expect(waitpoint.status).toBe("COMPLETED"); + expect(waitpoint.outputIsError).toBe(true); + + await vi.waitFor( + async () => { + const wps = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(wps.length).toBe(0); + }, + { timeout: 15000 } + ); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: tryCompleteBatch leaves an ABORTED batch terminal", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + // An aborted batch is terminal: a later tryCompleteBatch (e.g. from a + // straggler child run finalizing) must not flip it back to COMPLETED. + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "ABORTED", + runCount: 0, + expectedCount: 0, + sealed: false, + batchVersion: "runengine:v2", + completedAt: new Date(), + }, + }); + + await engine.tryCompleteBatch({ batchId }); + + // Allow the debounced completion job to run (or correctly no-op). + await setTimeout(1500); + + const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batch?.status).toBe("ABORTED"); + } finally { + await engine.quit(); + } + } + ); + containerTest( "2-phase batch: error if batch not initialized", async ({ prisma, redisOptions }) => {