diff --git a/.server-changes/realtime-append-cap.md b/.server-changes/realtime-append-cap.md new file mode 100644 index 0000000000..cfbd3b7fa1 --- /dev/null +++ b/.server-changes/realtime-append-cap.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Session `.in/append` returns readable 413s on oversize bodies (was failing browser fetches as opaque `TypeError: Failed to fetch`) and now rejects only records that would actually exceed S2's per-record ceiling, instead of guessing at a conservative pre-encoding cap. diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index 792b6480d8..dbdb9f47d5 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -27,12 +27,18 @@ const ParamsSchema = z.object({ // POST: server-side append of a single record to a session channel. Mirrors // the existing /realtime/v1/streams/:runId/:target/:streamId/append route, // scoped to a Session primitive. -// S2 enforces a 1 MiB per-record limit (metered as -// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at -// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string -// escaping, and any future per-record header additions all stay comfortably -// below S2's ceiling. See https://s2.dev/docs/limits. -const MAX_APPEND_BODY_BYTES = 1024 * 512; +// +// The HTTP body cap here is just a DoS pre-guard — set generously at +// 1 MiB so we don't buffer arbitrarily large inputs before we can +// compute the wrapped size. The actual S2 per-record limit (verified +// empirically against cloud S2) is enforced precisely inside +// `S2RealtimeStreams.#appendPartByName` — it throws +// `S2RecordTooLargeError` (a `ServiceValidationError` with status +// 413) when the metered record size would exceed S2's 1 MiB ceiling +// after JSON wrapping. That lets legitimate bodies up to ~1023 KiB +// raw through (ASCII or low-escape content) while still rejecting +// pathological all-quote content that would double on wrap. +const MAX_APPEND_BODY_BYTES = 1024 * 1024; const { action, loader } = createActionApiRoute( { diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts index 335116043d..a404e6a76a 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -1,4 +1,5 @@ import { json } from "@remix-run/server-runtime"; +import { tryCatch } from "@trigger.dev/core/utils"; import { z } from "zod"; import { $replica } from "~/db.server"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; @@ -13,6 +14,7 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { engine } from "~/v3/runEngine.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -77,13 +79,29 @@ const { action } = createActionApiRoute( const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`; const record = JSON.stringify(body.data.data); - // Append the record to the per-stream S2 stream (auto-creates on first write) - await realtimeStream.appendPart( - record, - recordId, - run.friendlyId, - `$trigger.input:${params.streamId}` + // Append the record to the per-stream S2 stream (auto-creates on + // first write). `appendPart` can throw `S2RecordTooLargeError` (a + // `ServiceValidationError` with status 413) when the wrapped + // record exceeds S2's per-record ceiling — surface that as 413 + // rather than letting it propagate to the apiBuilder catch-all + // as a generic 500. + const [appendError] = await tryCatch( + realtimeStream.appendPart( + record, + recordId, + run.friendlyId, + `$trigger.input:${params.streamId}` + ) ); + if (appendError) { + if (appendError instanceof ServiceValidationError) { + return json( + { ok: false, error: appendError.message }, + { status: appendError.status ?? 422 } + ); + } + throw appendError; + } // Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none) // Get first, complete, then delete — so the mapping survives if completeWaitpoint throws diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts index 04d6ec2d8e..77f03f4ce5 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts @@ -24,10 +24,12 @@ const ParamsSchema = z.object({ io: z.enum(["out", "in"]), }); -// S2 record body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append -// route — keep it well under S2's 1 MiB per-record limit so JSON wrapping, -// string escaping, and any future per-record headers stay safe. -const MAX_APPEND_BODY_BYTES = 1024 * 512; +// HTTP body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append +// route — DoS pre-guard only. The actual S2 per-record limit is +// enforced precisely by `S2RealtimeStreams.#appendPartByName` +// (throws `S2RecordTooLargeError` with status 413 when the metered +// record size would exceed S2's 1 MiB ceiling after JSON wrapping). +const MAX_APPEND_BODY_BYTES = 1024 * 1024; // POST: Append a single record to a Session channel from the dashboard // playground. Mirrors the public `POST /realtime/v1/sessions/:session/:io/append` diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 0706107144..b91f47c6b3 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -4,6 +4,36 @@ import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { headerValue } from "@trigger.dev/core/v3"; import { randomUUID } from "node:crypto"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +// S2's per-record metered-size limit. Verified empirically against +// cloud S2: append succeeds at metered=1048576 and 422s at 1048577 +// with `"record must have metered size less than 1 MiB"` (the "less +// than" wording is slightly off — the boundary is inclusive). +// +// Metered size formula: +// metered = 8 (record overhead) + 2*H + Σ(header name + value) + body +// where `body` is the unescaped record body length in UTF-8 bytes and +// `H` is the number of S2 record headers. +// +// We attach no record headers (H=0), so the budget reduces to: +// 8 + body ≤ 1048576 → body ≤ 1048568 +export const S2_MAX_METERED_BYTES = 1024 * 1024; // 1 MiB +export const S2_RECORD_BASE_OVERHEAD_BYTES = 8; + +/** + * Thrown when a record's metered size would exceed S2's hard per-record + * limit. Caught by the route handler and surfaced as 413. + */ +export class S2RecordTooLargeError extends ServiceValidationError { + constructor(public readonly meteredBytes: number) { + super( + `Record metered size ${meteredBytes} bytes exceeds the S2 per-record limit of ${S2_MAX_METERED_BYTES} bytes. Reduce tool-output size or split into smaller parts.`, + 413 + ); + this.name = "S2RecordTooLargeError"; + } +} export type S2RealtimeStreamsOptions = { // S2 @@ -181,8 +211,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { async #appendPartByName(part: string, partId: string, s2Stream: string): Promise { this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream }); + const recordBody = JSON.stringify({ data: part, id: partId }); + const meteredBytes = Buffer.byteLength(recordBody, "utf8") + S2_RECORD_BASE_OVERHEAD_BYTES; + if (meteredBytes > S2_MAX_METERED_BYTES) { + throw new S2RecordTooLargeError(meteredBytes); + } + const result = await this.s2Append(s2Stream, { - records: [{ body: JSON.stringify({ data: part, id: partId }) }], + records: [{ body: recordBody }], }); this.logger.debug(`S2 append result`, { result }); diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index a66de63e11..0c661a7e68 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -777,7 +777,11 @@ export function createActionApiRoute< const contentLength = request.headers.get("content-length"); if (!contentLength || parseInt(contentLength) > maxContentLength) { - return json({ error: "Request body too large" }, { status: 413 }); + return await wrapResponse( + request, + json({ error: "Request body too large" }, { status: 413 }), + corsStrategy !== "none" + ); } }