Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/realtime-append-cap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Session `.in/append` returns readable 413s on oversize bodies (was failing browser fetches as opaque `TypeError: Failed to fetch`) and now rejects only records that would actually exceed S2's per-record ceiling, instead of guessing at a conservative pre-encoding cap.
18 changes: 12 additions & 6 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ const ParamsSchema = z.object({
// POST: server-side append of a single record to a session channel. Mirrors
// the existing /realtime/v1/streams/:runId/:target/:streamId/append route,
// scoped to a Session primitive.
// S2 enforces a 1 MiB per-record limit (metered as
// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at
// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string
// escaping, and any future per-record header additions all stay comfortably
// below S2's ceiling. See https://s2.dev/docs/limits.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
//
// The HTTP body cap here is just a DoS pre-guard — set generously at
// 1 MiB so we don't buffer arbitrarily large inputs before we can
// compute the wrapped size. The actual S2 per-record limit (verified
// empirically against cloud S2) is enforced precisely inside
// `S2RealtimeStreams.#appendPartByName` — it throws
// `S2RecordTooLargeError` (a `ServiceValidationError` with status
// 413) when the metered record size would exceed S2's 1 MiB ceiling
// after JSON wrapping. That lets legitimate bodies up to ~1023 KiB
// raw through (ASCII or low-escape content) while still rejecting
// pathological all-quote content that would double on wrap.
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

const { action, loader } = createActionApiRoute(
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { json } from "@remix-run/server-runtime";
import { tryCatch } from "@trigger.dev/core/utils";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
Expand All @@ -13,6 +14,7 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/common.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down Expand Up @@ -77,13 +79,29 @@ const { action } = createActionApiRoute(
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
const record = JSON.stringify(body.data.data);

// Append the record to the per-stream S2 stream (auto-creates on first write)
await realtimeStream.appendPart(
record,
recordId,
run.friendlyId,
`$trigger.input:${params.streamId}`
// Append the record to the per-stream S2 stream (auto-creates on
// first write). `appendPart` can throw `S2RecordTooLargeError` (a
// `ServiceValidationError` with status 413) when the wrapped
// record exceeds S2's per-record ceiling — surface that as 413
// rather than letting it propagate to the apiBuilder catch-all
// as a generic 500.
const [appendError] = await tryCatch(
realtimeStream.appendPart(
record,
recordId,
run.friendlyId,
`$trigger.input:${params.streamId}`
)
);
if (appendError) {
if (appendError instanceof ServiceValidationError) {
return json(
{ ok: false, error: appendError.message },
{ status: appendError.status ?? 422 }
);
}
throw appendError;
}

// Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none)
// Get first, complete, then delete — so the mapping survives if completeWaitpoint throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ const ParamsSchema = z.object({
io: z.enum(["out", "in"]),
});

// S2 record body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — keep it well under S2's 1 MiB per-record limit so JSON wrapping,
// string escaping, and any future per-record headers stay safe.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
// HTTP body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — DoS pre-guard only. The actual S2 per-record limit is
// enforced precisely by `S2RealtimeStreams.#appendPartByName`
// (throws `S2RecordTooLargeError` with status 413 when the metered
// record size would exceed S2's 1 MiB ceiling after JSON wrapping).
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

// POST: Append a single record to a Session channel from the dashboard
// playground. Mirrors the public `POST /realtime/v1/sessions/:session/:io/append`
Expand Down
38 changes: 37 additions & 1 deletion apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions }
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { headerValue } from "@trigger.dev/core/v3";
import { randomUUID } from "node:crypto";
import { ServiceValidationError } from "~/v3/services/common.server";

// S2's per-record metered-size limit. Verified empirically against
// cloud S2: append succeeds at metered=1048576 and 422s at 1048577
// with `"record must have metered size less than 1 MiB"` (the "less
// than" wording is slightly off — the boundary is inclusive).
//
// Metered size formula:
// metered = 8 (record overhead) + 2*H + Σ(header name + value) + body
// where `body` is the unescaped record body length in UTF-8 bytes and
// `H` is the number of S2 record headers.
//
// We attach no record headers (H=0), so the budget reduces to:
// 8 + body ≤ 1048576 → body ≤ 1048568
export const S2_MAX_METERED_BYTES = 1024 * 1024; // 1 MiB
export const S2_RECORD_BASE_OVERHEAD_BYTES = 8;

/**
* Thrown when a record's metered size would exceed S2's hard per-record
* limit. Caught by the route handler and surfaced as 413.
*/
export class S2RecordTooLargeError extends ServiceValidationError {
constructor(public readonly meteredBytes: number) {
super(
`Record metered size ${meteredBytes} bytes exceeds the S2 per-record limit of ${S2_MAX_METERED_BYTES} bytes. Reduce tool-output size or split into smaller parts.`,
413
);
this.name = "S2RecordTooLargeError";
}
}

export type S2RealtimeStreamsOptions = {
// S2
Expand Down Expand Up @@ -181,8 +211,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
async #appendPartByName(part: string, partId: string, s2Stream: string): Promise<void> {
this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream });

const recordBody = JSON.stringify({ data: part, id: partId });
const meteredBytes = Buffer.byteLength(recordBody, "utf8") + S2_RECORD_BASE_OVERHEAD_BYTES;
if (meteredBytes > S2_MAX_METERED_BYTES) {
throw new S2RecordTooLargeError(meteredBytes);
}
Comment thread
ericallam marked this conversation as resolved.

const result = await this.s2Append(s2Stream, {
records: [{ body: JSON.stringify({ data: part, id: partId }) }],
records: [{ body: recordBody }],
Comment thread
ericallam marked this conversation as resolved.
});

this.logger.debug(`S2 append result`, { result });
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,11 @@ export function createActionApiRoute<
const contentLength = request.headers.get("content-length");

if (!contentLength || parseInt(contentLength) > maxContentLength) {
return json({ error: "Request body too large" }, { status: 413 });
return await wrapResponse(
request,
json({ error: "Request body too large" }, { status: 413 }),
corsStrategy !== "none"
);
}
}

Expand Down
Loading