Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/recover-stranded-batch-trigger-and-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Recover `batchTriggerAndWait` parents that previously hung forever when a batch's item stream never completed. Batches left unsealed past a timeout are now aborted and the waiting parent resumes with an error instead of waiting indefinitely.
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,13 @@ const EnvironmentSchema = z
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
// so raise with care. Set to 1 for fully sequential ingestion.
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
// Seal-timeout reaper: if a batch's Phase 2 item stream never seals the batch
// (rate-limit, request timeout, crash), abort it after this delay and resume any
// blocked parent (batchTriggerAndWait) with an error instead of hanging forever.
// Must exceed the worst-case legitimate time-to-seal: the SDK retries the stream
// up to maxAttempts (5) times, each attempt bounded by the server request timeout
// (~300s), so the floor is ~5 × requestTimeout. Default 30m leaves headroom.
BATCH_SEAL_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000),
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
Expand Down
16 changes: 16 additions & 0 deletions apps/webapp/app/runEngine/services/createBatch.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
import { Evt } from "evt";
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
Expand Down Expand Up @@ -149,6 +150,21 @@ export class CreateBatchService extends WithRunEngine {

await this._engine.initializeBatch(initOptions);

// Guard the gap between creating the batch and sealing it: if the item
// stream never seals this batch, the reaper aborts it after the timeout
// and resumes any blocked parent with an error instead of leaving it
// suspended forever. If scheduling the reaper itself fails, abort the
// batch now so a blocked parent can't be stranded with nothing to free it.
try {
await this._engine.scheduleExpireBatch({
batchId: batch.id,
availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS),
});
} catch (scheduleError) {
await this._engine.expireBatch({ batchId: batch.id });
throw scheduleError;
}

logger.info("Batch created", {
batchId: friendlyId,
runCount: body.runCount,
Expand Down
26 changes: 26 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ export class RunEngine {
tryCompleteBatch: async ({ payload }) => {
await this.batchSystem.performCompleteBatch({ batchId: payload.batchId });
},
expireBatch: async ({ payload }) => {
await this.batchSystem.expireBatch({ batchId: payload.batchId });
},
continueRunIfUnblocked: async ({ payload }) => {
await this.waitpointSystem.continueRunIfUnblocked({
runId: payload.runId,
Expand Down Expand Up @@ -1640,6 +1643,29 @@ export class RunEngine {
return this.batchSystem.scheduleCompleteBatch({ batchId });
}

/**
* Terminally fail a batch whose Phase 2 item stream never sealed, resolving the
* parent's batchTriggerAndWait waitpoint with an error so the parent resumes
* instead of hanging forever.
*/
async expireBatch({ batchId }: { batchId: string }): Promise<void> {
return this.batchSystem.expireBatch({ batchId });
}

/**
* Schedule the seal-timeout reaper for a batch. If the batch hasn't sealed by
* `availableAt`, {@link expireBatch} terminally fails it and resumes the parent.
*/
async scheduleExpireBatch({
batchId,
availableAt,
}: {
batchId: string;
availableAt: Date;
}): Promise<void> {
return this.batchSystem.scheduleExpireBatch({ batchId, availableAt });
}

// ============================================================================
// BatchQueue methods (DRR-based batch processing)
// ============================================================================
Expand Down
118 changes: 116 additions & 2 deletions internal-packages/run-engine/src/engine/systems/batchSystem.ts
Comment thread
matt-aitken marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { startSpan } from "@internal/tracing";
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
import { isFinalRunStatus } from "../statuses.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
Expand Down Expand Up @@ -32,6 +33,116 @@ export class BatchSystem {
await this.#tryCompleteBatch({ batchId });
}

public async scheduleExpireBatch({
batchId,
availableAt,
}: {
batchId: string;
availableAt: Date;
}): Promise<void> {
await this.$.worker.enqueue({
// Stable id dedupes repeated schedules for the same batch.
id: `expireBatch:${batchId}`,
job: "expireBatch",
payload: { batchId },
availableAt,
});
}

/**
* Terminally fail a batch whose Phase 2 item stream never sealed it, and resolve
* the parent's batchTriggerAndWait waitpoint with an error so the parent resumes
* with a failure instead of hanging forever.
*
* Idempotent and race-safe: if the stream sealed the batch (or it otherwise
* progressed past an unsealed PENDING state) in the meantime, this is a no-op.
*/
public async expireBatch({ batchId }: { batchId: string }): Promise<void> {
return startSpan(this.$.tracer, "expireBatch", async (span) => {
span.setAttribute("batchId", batchId);

const batch = await this.$.prisma.batchTaskRun.findFirst({
select: { status: true, sealed: true, processingCompletedAt: true },
where: { id: batchId },
});

if (!batch) {
this.$.logger.debug("expireBatch: batch doesn't exist", { batchId });
return;
}

// The stream sealed the batch, so the normal completion path owns it.
if (batch.sealed) {
this.$.logger.debug("expireBatch: batch already sealed", { batchId });
return;
}

// Terminal states other than ABORTED are done. ABORTED falls through on
// purpose: a previous attempt may have aborted the batch but crashed before
// resolving the waitpoint, and completeWaitpoint is idempotent, so retrying
// can't leave the parent blocked.
if (batch.status !== "PENDING" && batch.status !== "ABORTED") {
this.$.logger.debug("expireBatch: batch in terminal non-aborted state", {
batchId,
status: batch.status,
});
return;
}

// The BatchQueue already processed every item (the completion callback set
// processingCompletedAt) even though the seal never landed — the runs exist
// and will resume the parent on their own. Aborting would fail a healthy batch.
if (batch.status === "PENDING" && batch.processingCompletedAt !== null) {
this.$.logger.debug("expireBatch: items already processed, not aborting", { batchId });
return;
}

if (batch.status === "PENDING") {
// Conditional update guards against racing a late seal or completion —
// whichever loses no-ops.
const aborted = await this.$.prisma.batchTaskRun.updateMany({
where: { id: batchId, sealed: false, status: "PENDING", processingCompletedAt: null },
data: {
status: "ABORTED",
completedAt: new Date(),
},
});

if (aborted.count === 0) {
this.$.logger.debug("expireBatch: lost race to seal/complete, no-op", { batchId });
return;
}
}

// Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to
// resolve. The status filter keeps this idempotent if a previous attempt
// already resolved it.
const waitpoint = await this.$.prisma.waitpoint.findFirst({
where: { completedByBatchId: batchId, status: "PENDING" },
});

if (!waitpoint) {
this.$.logger.debug("expireBatch: no pending waitpoint to resolve", { batchId });
return;
}

const error: TaskRunError = {
type: "STRING_ERROR",
raw: "Batch items could not be streamed before the batch timed out",
};

await this.waitpointSystem.completeWaitpoint({
id: waitpoint.id,
output: { value: JSON.stringify(error), isError: true },
});

this.$.logger.warn("expireBatch: aborted unsealed batch and resumed parent with error", {
batchId,
waitpointId: waitpoint.id,
});
});
}

/**
* Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status.
* This isn't used operationally, but it's used for the Batches dashboard page.
Expand All @@ -58,8 +169,11 @@ export class BatchSystem {
return;
}

if (batch.status === "COMPLETED") {
this.$.logger.debug("#tryCompleteBatch: Batch already completed", { batchId });
if (batch.status === "COMPLETED" || batch.status === "ABORTED") {
this.$.logger.debug("#tryCompleteBatch: Batch already in a terminal status", {
batchId,
status: batch.status,
});
return;
}

Expand Down
Loading