diff --git a/.server-changes/recover-stranded-batch-trigger-and-wait.md b/.server-changes/recover-stranded-batch-trigger-and-wait.md new file mode 100644 index 0000000000..91e42e0067 --- /dev/null +++ b/.server-changes/recover-stranded-batch-trigger-and-wait.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Recover `batchTriggerAndWait` parents that previously hung forever when a batch's item stream never completed. Batches left unsealed past a timeout are now aborted and the waiting parent resumes with an error instead of waiting indefinitely. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dcb4eb11db..cf47435846 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -768,6 +768,13 @@ const EnvironmentSchema = z // in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE, // so raise with care. Set to 1 for fully sequential ingestion. STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10), + // Seal-timeout reaper: if a batch's Phase 2 item stream never seals the batch + // (rate-limit, request timeout, crash), abort it after this delay and resume any + // blocked parent (batchTriggerAndWait) with an error instead of hanging forever. + // Must exceed the worst-case legitimate time-to-seal: the SDK retries the stream + // up to maxAttempts (5) times, each attempt bounded by the server request timeout + // (~300s), so the floor is ~5 × requestTimeout. Default 30m leaves headroom. + BATCH_SEAL_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000), BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100), BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200), BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts index 0653e1ef1c..1d7cfefa20 100644 --- a/apps/webapp/app/runEngine/services/createBatch.server.ts +++ b/apps/webapp/app/runEngine/services/createBatch.server.ts @@ -4,6 +4,7 @@ import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic"; import { type BatchTaskRun, Prisma } from "@trigger.dev/database"; import { Evt } from "evt"; import { prisma, type PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; @@ -149,6 +150,21 @@ export class CreateBatchService extends WithRunEngine { await this._engine.initializeBatch(initOptions); + // Guard the gap between creating the batch and sealing it: if the item + // stream never seals this batch, the reaper aborts it after the timeout + // and resumes any blocked parent with an error instead of leaving it + // suspended forever. If scheduling the reaper itself fails, abort the + // batch now so a blocked parent can't be stranded with nothing to free it. + try { + await this._engine.scheduleExpireBatch({ + batchId: batch.id, + availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS), + }); + } catch (scheduleError) { + await this._engine.expireBatch({ batchId: batch.id }); + throw scheduleError; + } + logger.info("Batch created", { batchId: friendlyId, runCount: body.runCount, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a6a20b5b9f..8673e31c39 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -263,6 +263,9 @@ export class RunEngine { tryCompleteBatch: async ({ payload }) => { await this.batchSystem.performCompleteBatch({ batchId: payload.batchId }); }, + expireBatch: async ({ payload }) => { + await this.batchSystem.expireBatch({ batchId: payload.batchId }); + }, continueRunIfUnblocked: async ({ payload }) => { await this.waitpointSystem.continueRunIfUnblocked({ runId: payload.runId, @@ -1640,6 +1643,29 @@ export class RunEngine { return this.batchSystem.scheduleCompleteBatch({ batchId }); } + /** + * Terminally fail a batch whose Phase 2 item stream never sealed, resolving the + * parent's batchTriggerAndWait waitpoint with an error so the parent resumes + * instead of hanging forever. + */ + async expireBatch({ batchId }: { batchId: string }): Promise { + return this.batchSystem.expireBatch({ batchId }); + } + + /** + * Schedule the seal-timeout reaper for a batch. If the batch hasn't sealed by + * `availableAt`, {@link expireBatch} terminally fails it and resumes the parent. + */ + async scheduleExpireBatch({ + batchId, + availableAt, + }: { + batchId: string; + availableAt: Date; + }): Promise { + return this.batchSystem.scheduleExpireBatch({ batchId, availableAt }); + } + // ============================================================================ // BatchQueue methods (DRR-based batch processing) // ============================================================================ diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index a3d44507a4..07b09d8a59 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -1,4 +1,5 @@ import { startSpan } from "@internal/tracing"; +import { TaskRunError } from "@trigger.dev/core/v3/schemas"; import { isFinalRunStatus } from "../statuses.js"; import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; @@ -32,6 +33,116 @@ export class BatchSystem { await this.#tryCompleteBatch({ batchId }); } + public async scheduleExpireBatch({ + batchId, + availableAt, + }: { + batchId: string; + availableAt: Date; + }): Promise { + await this.$.worker.enqueue({ + // Stable id dedupes repeated schedules for the same batch. + id: `expireBatch:${batchId}`, + job: "expireBatch", + payload: { batchId }, + availableAt, + }); + } + + /** + * Terminally fail a batch whose Phase 2 item stream never sealed it, and resolve + * the parent's batchTriggerAndWait waitpoint with an error so the parent resumes + * with a failure instead of hanging forever. + * + * Idempotent and race-safe: if the stream sealed the batch (or it otherwise + * progressed past an unsealed PENDING state) in the meantime, this is a no-op. + */ + public async expireBatch({ batchId }: { batchId: string }): Promise { + return startSpan(this.$.tracer, "expireBatch", async (span) => { + span.setAttribute("batchId", batchId); + + const batch = await this.$.prisma.batchTaskRun.findFirst({ + select: { status: true, sealed: true, processingCompletedAt: true }, + where: { id: batchId }, + }); + + if (!batch) { + this.$.logger.debug("expireBatch: batch doesn't exist", { batchId }); + return; + } + + // The stream sealed the batch, so the normal completion path owns it. + if (batch.sealed) { + this.$.logger.debug("expireBatch: batch already sealed", { batchId }); + return; + } + + // Terminal states other than ABORTED are done. ABORTED falls through on + // purpose: a previous attempt may have aborted the batch but crashed before + // resolving the waitpoint, and completeWaitpoint is idempotent, so retrying + // can't leave the parent blocked. + if (batch.status !== "PENDING" && batch.status !== "ABORTED") { + this.$.logger.debug("expireBatch: batch in terminal non-aborted state", { + batchId, + status: batch.status, + }); + return; + } + + // The BatchQueue already processed every item (the completion callback set + // processingCompletedAt) even though the seal never landed — the runs exist + // and will resume the parent on their own. Aborting would fail a healthy batch. + if (batch.status === "PENDING" && batch.processingCompletedAt !== null) { + this.$.logger.debug("expireBatch: items already processed, not aborting", { batchId }); + return; + } + + if (batch.status === "PENDING") { + // Conditional update guards against racing a late seal or completion — + // whichever loses no-ops. + const aborted = await this.$.prisma.batchTaskRun.updateMany({ + where: { id: batchId, sealed: false, status: "PENDING", processingCompletedAt: null }, + data: { + status: "ABORTED", + completedAt: new Date(), + }, + }); + + if (aborted.count === 0) { + this.$.logger.debug("expireBatch: lost race to seal/complete, no-op", { batchId }); + return; + } + } + + // Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to + // resolve. The status filter keeps this idempotent if a previous attempt + // already resolved it. + const waitpoint = await this.$.prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId, status: "PENDING" }, + }); + + if (!waitpoint) { + this.$.logger.debug("expireBatch: no pending waitpoint to resolve", { batchId }); + return; + } + + const error: TaskRunError = { + type: "STRING_ERROR", + raw: "Batch items could not be streamed before the batch timed out", + }; + + await this.waitpointSystem.completeWaitpoint({ + id: waitpoint.id, + output: { value: JSON.stringify(error), isError: true }, + }); + + this.$.logger.warn("expireBatch: aborted unsealed batch and resumed parent with error", { + batchId, + waitpointId: waitpoint.id, + }); + }); + } + /** * Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status. * This isn't used operationally, but it's used for the Batches dashboard page. @@ -58,8 +169,11 @@ export class BatchSystem { return; } - if (batch.status === "COMPLETED") { - this.$.logger.debug("#tryCompleteBatch: Batch already completed", { batchId }); + if (batch.status === "COMPLETED" || batch.status === "ABORTED") { + this.$.logger.debug("#tryCompleteBatch: Batch already in a terminal status", { + batchId, + status: batch.status, + }); return; } diff --git a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts index 8471c07844..56249e4309 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts @@ -13,9 +13,53 @@ import type { BatchItem, InitializeBatchOptions, } from "../../batch-queue/types.js"; +import { type PrismaClient } from "@trigger.dev/database"; +import { type RedisOptions } from "ioredis"; vi.setConfig({ testTimeout: 60_000 }); +function createBatchTestEngine(prisma: PrismaClient, redisOptions: RedisOptions) { + return new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); +} + describe("RunEngine 2-Phase Batch API", () => { containerTest( "2-phase batch: initialize batch, stream items one by one, items get processed", @@ -565,6 +609,436 @@ describe("RunEngine 2-Phase Batch API", () => { } ); + containerTest( + "2-phase batch: expireBatch aborts an unsealed batch and resumes the parent with an error", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + // Phase 2 never runs for this batch, so neither callback should fire. + engine.setBatchProcessItemCallback(async () => { + return { success: true, runId: "should-not-be-called" }; + }); + engine.setBatchCompletionCallback(async () => {}); + + try { + const parentTask = "parent-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]); + + // Phase 1: create the batch record (PENDING, unsealed) — mirrors CreateBatchService + const { id: batchId, friendlyId: batchFriendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId: batchFriendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + // Trigger and start the parent attempt + const parentRun = await engine.trigger( + { + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_parent", + spanId: "s_parent", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + + const initialExecutionData = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(initialExecutionData); + await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: initialExecutionData.snapshot.id, + }); + + // Phase 1 continued: block the parent on the batch and initialize its metadata + await engine.blockRunWithCreatedBatch({ + runId: parentRun.id, + batchId, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + await engine.initializeBatch({ + batchId, + friendlyId: batchFriendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + parentRunId: parentRun.id, + resumeParentOnCompletion: true, + }); + + const afterBlocked = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(afterBlocked); + expect(afterBlocked.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Phase 2 never seals the batch. The seal-timeout reaper fires. + await engine.expireBatch({ batchId }); + + // The batch is terminally failed... + const abortedBatch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(abortedBatch?.status).toBe("ABORTED"); + expect(abortedBatch?.completedAt).not.toBeNull(); + + // ...its waitpoint is completed with an error... + const waitpoint = await prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId }, + }); + assertNonNullable(waitpoint); + expect(waitpoint.status).toBe("COMPLETED"); + expect(waitpoint.outputIsError).toBe(true); + + // ...and the parent is unblocked and resumes instead of hanging forever. + await vi.waitFor( + async () => { + const waitpoints = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(waitpoints.length).toBe(0); + }, + { timeout: 15000 } + ); + + const parentAfter = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(parentAfter); + expect(parentAfter.snapshot.executionStatus).toBe("EXECUTING"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: a scheduled seal-timeout aborts an unsealed batch", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 20, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + batchQueue: { + redis: redisOptions, + consumerCount: 2, + consumerIntervalMs: 50, + drr: { + quantum: 10, + maxDeficit: 100, + }, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + engine.setBatchProcessItemCallback(async () => { + return { success: true, runId: "should-not-be-called" }; + }); + engine.setBatchCompletionCallback(async () => {}); + + try { + // Phase 1: create + initialize the batch, but never stream/seal it. + const { id: batchId, friendlyId: batchFriendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId: batchFriendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 3, + expectedCount: 3, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + await engine.initializeBatch({ + batchId, + friendlyId: batchFriendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 3, + }); + + // Schedule the seal-timeout to fire immediately. + await engine.scheduleExpireBatch({ batchId, availableAt: new Date() }); + + // The worker picks up the job and aborts the unsealed batch. + await vi.waitFor( + async () => { + const batchRecord = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batchRecord?.status).toBe("ABORTED"); + }, + { timeout: 15000 } + ); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: expireBatch does not abort a batch whose items were all processed", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + // The BatchQueue streamed and processed every item (the completion callback + // set processingCompletedAt) but the seal never landed. The runs exist and + // will resume the parent on their own, so the reaper must NOT abort this. + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + processingCompletedAt: new Date(), + }, + }); + + await engine.expireBatch({ batchId }); + + const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batch?.status).toBe("PENDING"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: expireBatch resumes the parent even if a prior attempt already aborted the batch", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + const parentTask = "parent-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]); + + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "PENDING", + runCount: 2, + expectedCount: 2, + sealed: false, + batchVersion: "runengine:v2", + }, + }); + + const parentRun = await engine.trigger( + { + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_parent", + spanId: "s_parent", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + + const initial = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(initial); + await engine.startRunAttempt({ runId: parentRun.id, snapshotId: initial.snapshot.id }); + + await engine.blockRunWithCreatedBatch({ + runId: parentRun.id, + batchId, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + // Simulate a prior expireBatch attempt that aborted the batch but crashed + // before resolving the parent's waitpoint. + await prisma.batchTaskRun.update({ + where: { id: batchId }, + data: { status: "ABORTED", completedAt: new Date() }, + }); + + // The retry must still resolve the parent's waitpoint so it can't stay stuck. + await engine.expireBatch({ batchId }); + + const waitpoint = await prisma.waitpoint.findFirst({ + where: { completedByBatchId: batchId }, + }); + assertNonNullable(waitpoint); + expect(waitpoint.status).toBe("COMPLETED"); + expect(waitpoint.outputIsError).toBe(true); + + await vi.waitFor( + async () => { + const wps = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(wps.length).toBe(0); + }, + { timeout: 15000 } + ); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "2-phase batch: tryCompleteBatch leaves an ABORTED batch terminal", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createBatchTestEngine(prisma, redisOptions); + engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" })); + engine.setBatchCompletionCallback(async () => {}); + + try { + // An aborted batch is terminal: a later tryCompleteBatch (e.g. from a + // straggler child run finalizing) must not flip it back to COMPLETED. + const { id: batchId, friendlyId } = BatchId.generate(); + await prisma.batchTaskRun.create({ + data: { + id: batchId, + friendlyId, + runtimeEnvironmentId: authenticatedEnvironment.id, + status: "ABORTED", + runCount: 0, + expectedCount: 0, + sealed: false, + batchVersion: "runengine:v2", + completedAt: new Date(), + }, + }); + + await engine.tryCompleteBatch({ batchId }); + + // Allow the debounced completion job to run (or correctly no-op). + await setTimeout(1500); + + const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } }); + expect(batch?.status).toBe("ABORTED"); + } finally { + await engine.quit(); + } + } + ); + containerTest( "2-phase batch: error if batch not initialized", async ({ prisma, redisOptions }) => { diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index de54c1ece0..8f49d7d606 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -58,6 +58,12 @@ export const workerCatalog = { }), visibilityTimeoutMs: 30_000, }, + expireBatch: { + schema: z.object({ + batchId: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, continueRunIfUnblocked: { schema: z.object({ runId: z.string(),