From b3aa85b6416d4b17e12e229fe751d6d38df6f7c0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 16 May 2026 23:16:18 +0100 Subject: [PATCH 1/4] feat(webapp,core,sdk,cli): bound session.out via per-turn trim After each `trigger:turn-complete`, the agent appends an S2 `trim` command back to the previous turn-complete's seq_num. `session.out` stays roughly one turn long at steady state, regardless of how long the chat has been running. `trigger:turn-complete` and `trigger:upgrade-required` move from `chunk.type`-shaped data records into header-form control records under a `trigger-control` namespace. Built-in transports (`TriggerChatTransport`, `AgentChat`, dashboard `AgentView`) handle this transparently; custom transports need a one-line filter on the `trigger-control` header. The Sessions detail page in the dashboard fetches the per-turn S3 snapshot via a presigned URL and seeds the transcript view, then SSE-tails from the snapshot's `lastOutEventId`. Bandwidth and time-to-first-render scale with unread turns, not session lifetime. --- .../components/runs/v3/agent/AgentView.tsx | 124 +++++++++++-- .../presenters/v3/SessionPresenter.server.ts | 52 ++++++ .../route.tsx | 2 + .../realtime/s2realtimeStreams.server.ts | 138 +++++++++----- apps/webapp/app/services/realtime/types.ts | 13 +- packages/cli-v3/src/mcp/tools/agentChat.ts | 90 +++++----- packages/core/src/v3/apiClient/index.ts | 34 +++- .../core/src/v3/apiClient/runStream.test.ts | 154 ++++++++++++++++ packages/core/src/v3/apiClient/runStream.ts | 48 ++++- packages/core/src/v3/realtime-streams-api.ts | 6 + packages/core/src/v3/realtimeStreams/index.ts | 6 + .../realtimeStreams/sessionStreamOneshot.ts | 136 ++++++++++++++ packages/core/src/v3/session-streams-api.ts | 2 + .../src/v3/sessionStreams/chatSnapshot.ts | 55 ++++++ .../core/src/v3/sessionStreams/manager.ts | 8 + .../src/v3/sessionStreams/wireProtocol.ts | 81 +++++++++ packages/trigger-sdk/src/v3/ai.ts | 170 +++++++++++------- packages/trigger-sdk/src/v3/chat-client.ts | 42 ++--- packages/trigger-sdk/src/v3/chat-server.ts | 46 +++-- packages/trigger-sdk/src/v3/chat.test.ts | 49 ++++- packages/trigger-sdk/src/v3/chat.ts | 65 ++++--- packages/trigger-sdk/src/v3/sessions.ts | 35 ++++ .../src/v3/test/test-session-handle.ts | 33 ++++ 23 files changed, 1151 insertions(+), 238 deletions(-) create mode 100644 packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts create mode 100644 packages/core/src/v3/sessionStreams/chatSnapshot.ts create mode 100644 packages/core/src/v3/sessionStreams/wireProtocol.ts diff --git a/apps/webapp/app/components/runs/v3/agent/AgentView.tsx b/apps/webapp/app/components/runs/v3/agent/AgentView.tsx index eee7646d03f..cbe5cec8f3c 100644 --- a/apps/webapp/app/components/runs/v3/agent/AgentView.tsx +++ b/apps/webapp/app/components/runs/v3/agent/AgentView.tsx @@ -1,5 +1,5 @@ import type { UIMessage } from "@ai-sdk/react"; -import { SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3"; import { useEffect, useMemo, useRef, useState } from "react"; import { Paragraph } from "~/components/primitives/Paragraph"; import { Spinner } from "~/components/primitives/Spinner"; @@ -27,6 +27,15 @@ export type AgentViewAuth = { * channel and is merged in by the AgentView subscription. */ initialMessages: UIMessage[]; + /** + * Presigned GET URL for the session's chat-snapshot S3 blob (written + * by the agent after each turn-complete; see `ChatSnapshotV1`). + * Optional — sessions that registered a `hydrateMessages` hook skip + * snapshot writes and the URL fetch will 404. In that case the + * dashboard falls back to seq=0 SSE (which, post-trim, shows only the + * most recent turn). Generated server-side by `SessionPresenter`. + */ + snapshotPresignedUrl?: string; }; /** @@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { projectSlug: project.slug, envSlug: environment.slug, initialMessages: agentView.initialMessages, + snapshotPresignedUrl: agentView.snapshotPresignedUrl, }); // Sticky-bottom auto-scroll: walks up to find the inspector's scroll @@ -120,6 +130,11 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { * - `kind: "stop"` is a stop signal — no messages, nothing to render * here, so it's filtered. * + * Wire payloads are slim-wire (one new UIMessage per record, on + * `payload.message`). The legacy `payload.messages` array shape is kept + * here as a fallback so any historical records on a long-lived session + * still render. + * * The server wraps records in `{data, id}` and writes `data` as a JSON * string; SSE v2 delivers the parsed string back. {@link parseChunkPayload} * re-parses to recover the object. @@ -127,6 +142,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { type InputStreamChunk = { kind?: "message" | "stop"; payload?: { + message?: { id?: string; role?: string; parts?: unknown[] }; messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>; trigger?: string; }; @@ -217,6 +233,7 @@ function useAgentSessionMessages({ projectSlug, envSlug, initialMessages, + snapshotPresignedUrl, }: { sessionId: string; apiOrigin: string; @@ -224,6 +241,7 @@ function useAgentSessionMessages({ projectSlug: string; envSlug: string; initialMessages: UIMessage[]; + snapshotPresignedUrl?: string; }): UIMessage[] { // Seed with the user messages from the run's task payload. const seedMessages = useMemo( @@ -285,6 +303,56 @@ function useAgentSessionMessages({ const outputUrl = `${sessionBase}/out`; const inputUrl = `${sessionBase}/in`; + /** + * Try to seed `pendingRef` from the agent's S3 snapshot blob and return + * the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes + * just past the snapshot. Returns undefined for sessions that don't + * have a snapshot (e.g. `hydrateMessages` customers, or sessions that + * have never completed a turn). + */ + const loadSnapshot = async (): Promise => { + if (!snapshotPresignedUrl) return undefined; + try { + const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal }); + if (!resp.ok) return undefined; + const json = (await resp.json()) as unknown; + const parsed = ChatSnapshotV1Schema.safeParse(json); + if (!parsed.success) return undefined; + const snapshot = parsed.data; + // Preserve the snapshot's array order in the final render by + // giving each message a unique, monotonically increasing + // timestamp from `(savedAt - count + index)`. Real chunk + // timestamps from the SSE path use S2 arrival ms (positive + // numbers in the present), so anything below `savedAt` sorts + // before live chunks while preserving snapshot order among + // themselves. + const count = snapshot.messages.length; + snapshot.messages.forEach((raw, i) => { + const message = raw as UIMessage; + if (!message?.id) return; + // The snapshot's seed wins over the task-payload seed for any + // overlapping ids (the snapshot represents the agent's + // canonical accumulator, post-turn). + pendingRef.current.set(message.id, message); + if (!timestampsRef.current.has(message.id)) { + timestampsRef.current.set(message.id, snapshot.savedAt - count + i); + } + }); + scheduleFlush.current(); + return snapshot.lastOutEventId; + } catch { + // 404 / network / parse / abort — fall back to seq=0 SSE + return undefined; + } + }; + + const outputSubOptions = (lastEventId: string | undefined) => + ({ + signal: abort.signal, + timeoutInSeconds: 120, + ...(lastEventId !== undefined ? { lastEventId } : {}), + }) as const; + const commonSubOptions = { signal: abort.signal, timeoutInSeconds: 120, @@ -292,20 +360,35 @@ function useAgentSessionMessages({ // ---- Output stream: assistant messages --------------------------------- // - // The output stream delivers UIMessageChunks interleaved with - // Trigger-specific control chunks (`trigger:turn-complete`, etc.). We - // filter the control chunks and fold everything else into an assistant - // `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's - // `readUIMessageStream` helper is only available in `ai@6`, and the - // webapp is pinned to `ai@4`, so we re-implement just the chunk types - // that `renderPart` actually displays. + // The output stream delivers data records (UIMessageChunks) interleaved + // with Trigger control records (`turn-complete`, `upgrade-required`) and + // S2 command records (`trim`). Control + command records ride on + // `record.headers` with empty bodies; the SSE parser strips S2 command + // records entirely, and control records arrive with `value.chunk === + // undefined`, which `parseChunkPayload` drops below. + // + // We fold everything else into an assistant `UIMessage` via our own + // `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream` + // helper is only available in `ai@6`, and the webapp is pinned to + // `ai@4`, so we re-implement just the chunk types that `renderPart` + // actually displays. // // We capture the **server timestamp of each assistant message's first // `start` chunk** so later sort-by-timestamp merges with the input // stream correctly. const runOutput = async () => { try { - const sub = new SSEStreamSubscription(outputUrl, commonSubOptions); + // Seed messages from the snapshot first (if available), then + // resume the SSE from the snapshot's last event id so we don't + // re-stream chunks already represented in the snapshot. If no + // snapshot exists (no URL, 404, parse failure), the SSE opens + // at seq=0 — which, post-trim, contains roughly one turn of + // records (acceptable fallback for `hydrateMessages` sessions + // and fresh sessions). + const snapshotLastEventId = await loadSnapshot(); + if (abort.signal.aborted) return; + + const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId)); const raw = await sub.subscribe(); const reader = raw.getReader(); @@ -318,6 +401,12 @@ function useAgentSessionMessages({ const chunk = parseChunkPayload(value.chunk) as OutputChunk | null; if (!chunk || typeof chunk.type !== "string") continue; + // Legacy belt-and-suspenders: prior versions of the SDK + // emitted `trigger:turn-complete` / `trigger:upgrade-required` + // as data records (`type` field). Current versions use + // header-form control records, which `parseChunkPayload` + // drops above. Keep this filter to handle any in-flight + // sessions whose `.out` was populated by the older SDK. if (chunk.type.startsWith("trigger:")) continue; if (chunk.type === "start") { @@ -413,9 +502,18 @@ function useAgentSessionMessages({ const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null; if (!chunk || chunk.kind !== "message") continue; const payload = chunk.payload; - if (!payload || !Array.isArray(payload.messages)) continue; - - const incomingUsers = payload.messages.filter( + if (!payload) continue; + + // Slim-wire is one UIMessage on `payload.message`; legacy + // payloads carried an array on `payload.messages`. Accept + // either so historical records on a long-lived session still + // render. + const candidates = Array.isArray(payload.messages) + ? payload.messages + : payload.message + ? [payload.message] + : []; + const incomingUsers = candidates.filter( (m): m is UIMessage => m != null && (m as { role?: string }).role === "user" && typeof m.id === "string" ); @@ -454,7 +552,7 @@ function useAgentSessionMessages({ pendingTimerRef.current = null; } }; - }, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]); + }, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]); return useMemo(() => { const timestamps = timestampsRef.current; diff --git a/apps/webapp/app/presenters/v3/SessionPresenter.server.ts b/apps/webapp/app/presenters/v3/SessionPresenter.server.ts index 27807971d5a..4d75abb85b5 100644 --- a/apps/webapp/app/presenters/v3/SessionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SessionPresenter.server.ts @@ -1,8 +1,11 @@ import { type Span } from "@opentelemetry/api"; +import { chatSnapshotKeySuffix } from "@trigger.dev/core/v3"; import { type PrismaClientOrTransaction } from "@trigger.dev/database"; import { env } from "~/env.server"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; +import { logger } from "~/services/logger.server"; +import { generatePresignedUrl } from "~/v3/objectStore.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { startActiveSpan } from "~/v3/tracer.server"; @@ -15,6 +18,8 @@ export class SessionPresenter { userId: string; environmentId: string; sessionParam: string; + projectExternalRef: string; + environmentSlug: string; }) { return startActiveSpan( "SessionPresenter.call", @@ -33,10 +38,14 @@ export class SessionPresenter { userId, environmentId, sessionParam, + projectExternalRef, + environmentSlug, }: { userId: string; environmentId: string; sessionParam: string; + projectExternalRef: string; + environmentSlug: string; }, rootSpan: Span ) { @@ -112,6 +121,48 @@ export class SessionPresenter { // unused — kept here to match the existing `AgentViewAuth` shape. const addressingKey = session.externalId ?? session.friendlyId; + // Presign a GET URL for the agent's S3 snapshot blob. The browser + // fetches it directly, parses + validates, and seeds the + // TriggerChatTransport with the full history + lastEventId before + // opening the SSE. Presign succeeds regardless of whether the blob + // exists; the frontend handles 404 gracefully. + // + // Snapshots are only written when no `hydrateMessages` hook is + // registered — sessions that use `hydrateMessages` will 404 here + // and the dashboard falls back to seq=0 SSE (which, post-trim, + // shows only the most recent turn — accepted, those customers + // have their own DB-backed dashboards). + // The agent writes snapshots keyed on the session's friendlyId (the + // `session_*` form), which matches what the SDK's `chat.agent` payload + // carries as `sessionId`. Use the same key shape here so the dashboard + // hits the same S3 object. + let snapshotPresignedUrl: string | undefined; + try { + const signed = await startActiveSpan( + "SessionPresenter.presignSnapshot", + async () => + generatePresignedUrl( + projectExternalRef, + environmentSlug, + chatSnapshotKeySuffix(session.friendlyId), + "GET" + ) + ); + if (signed.success) { + snapshotPresignedUrl = signed.url; + } else { + logger.warn("SessionPresenter: snapshot presign failed", { + sessionId: session.id, + error: signed.error, + }); + } + } catch (error) { + logger.warn("SessionPresenter: snapshot presign threw", { + sessionId: session.id, + error: error instanceof Error ? error.message : String(error), + }); + } + return { id: session.id, friendlyId: session.friendlyId, @@ -147,6 +198,7 @@ export class SessionPresenter { apiOrigin: env.API_ORIGIN || env.LOGIN_ORIGIN, sessionId: addressingKey, initialMessages: [], + snapshotPresignedUrl, }, }; } diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx index 496a5fb6295..c873dd9f406 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx @@ -79,6 +79,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { userId, environmentId: environment.id, sessionParam, + projectExternalRef: project.externalRef, + environmentSlug: environment.slug, }); if (!session) { diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 4c735d21d46..0553ef77f9b 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -2,6 +2,7 @@ import type { UnkeyCache } from "@internal/cache"; import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } from "./types"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { headerValue } from "@trigger.dev/core/v3"; import { randomUUID } from "node:crypto"; export type S2RealtimeStreamsOptions = { @@ -258,15 +259,37 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { if (eventType === "batch" && data) { try { const parsed = JSON.parse(data) as { - records: Array<{ body: string; seq_num: number; timestamp: number }>; + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; for (const record of parsed.records) { - const parsedBody = JSON.parse(record.body) as { data: string; id: string }; + // S2 command records (trim/fence) have a single header with + // empty name. Skip — callers want only data + Trigger control + // records. + if (record.headers?.[0]?.[0] === "") { + continue; + } + + // Data records carry a JSON envelope; Trigger control records + // have an empty body and route via headers. Tolerate non-JSON + // bodies so a control record (or a malformed data record) + // doesn't take the whole batch down with it. + let parsedBody: { data: string; id: string } | undefined; + try { + parsedBody = JSON.parse(record.body) as { data: string; id: string }; + } catch { + parsedBody = undefined; + } records.push({ - data: parsedBody.data, - id: parsedBody.id, + data: parsedBody?.data ?? "", + id: parsedBody?.id ?? "", seqNum: record.seq_num, + headers: record.headers, }); } } catch { @@ -294,13 +317,19 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { * Serve SSE from a `Session`-primitive channel addressed by * `(friendlyId, io)`. * - * For `io=out`, peek the tail record first. If it's - * `trigger:turn-complete`, the agent has finished a turn and is - * either idle-waiting on `.in` or has exited — either way, no more - * chunks will arrive without further user action. We switch the - * downstream S2 read to `wait=0` (drain whatever's left, close fast) - * and set `X-Session-Settled: true` so the client knows this SSE - * close is terminal instead of the normal 60s long-poll cycle. + * For `io=out`, peek the tail of the stream. If the most recent + * non-command record is a `turn-complete` control record (i.e. the + * agent has finished a turn and is either idle-waiting on `.in` or + * has exited), no more chunks will arrive without further user + * action. We switch the downstream S2 read to `wait=0` (drain + * whatever's left, close fast) and set `X-Session-Settled: true` so + * the client knows this SSE close is terminal instead of the normal + * 60s long-poll cycle. + * + * The actual tail is now usually an S2 `trim` command record (the + * agent appends one after every turn-complete to keep `.out` + * bounded). The peek reads two records and walks past the trim to + * find the turn-complete underneath. * * Mid-turn tail (streaming UIMessageChunk) falls through to the * long-poll path; a crashed-mid-turn stream is indistinguishable @@ -324,13 +353,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { // races the newly-triggered turn's first chunk and the SSE closes // before records land. if (io === "out" && options?.peekSettled) { - const lastChunk = await this.#peekLastChunkBody(s2Stream); - const lastChunkType = - lastChunk != null && typeof lastChunk === "object" - ? (lastChunk as { type?: unknown }).type - : null; - if (lastChunkType === "trigger:turn-complete") { - settled = true; + settled = await this.#peekIsSettled(s2Stream); + if (settled) { waitSeconds = 0; } } @@ -351,13 +375,21 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { }); } - async #peekLastChunkBody(s2Stream: string): Promise { + /** + * Peek the tail of `.out` and return whether the stream is "settled" — + * i.e. the most recent non-command record is a `turn-complete` control + * record. The agent appends an S2 `trim` command record immediately + * after every turn-complete to keep the stream bounded, so we read two + * tail records and walk past any trim command to find the + * turn-complete underneath. + */ + async #peekIsSettled(s2Stream: string): Promise { const qs = new URLSearchParams(); - // `tail_offset=1` reads one record before the next seq — i.e. the - // most recently appended record. `count=1` caps it to just that - // record. `wait=0` returns immediately with no long-poll. - qs.set("tail_offset", "1"); - qs.set("count", "1"); + // `tail_offset=2` rewinds two seq positions; `count=2` caps it to + // those two records. At steady state these are `[turn-complete, trim]`. + // `wait=0` returns immediately with no long-poll. + qs.set("tail_offset", "2"); + qs.set("count", "2"); qs.set("wait", "0"); let res: Response; @@ -376,13 +408,13 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { ); } catch (err) { this.logger.warn("S2 peek last record: fetch failed", { err, stream: s2Stream }); - return null; + return false; } if (!res.ok) { // 404: stream has never been written to. 416: range not // satisfiable (empty stream). Both mean "nothing to peek." - if (res.status === 404 || res.status === 416) return null; + if (res.status === 404 || res.status === 416) return false; const text = await res.text().catch(() => ""); this.logger.warn("S2 peek last record failed", { status: res.status, @@ -390,32 +422,43 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { text, stream: s2Stream, }); - return null; + return false; } + let records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; try { const json = (await res.json()) as { - records?: Array<{ body: string; seq_num: number; timestamp: number }>; + records?: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; - const record = json.records?.[0]; - if (!record) return null; - // The record body is a JSON string `{data: , id: partId}`. - // The agent-side writer (`StreamsWriterV2`) hands `appendPart` an - // already-JSON-stringified chunk, so `data` round-trips as a string, - // not an object. Parse it once more to surface the chunk shape. - const envelope = JSON.parse(record.body) as { data: unknown; id: string }; - if (typeof envelope.data === "string") { - try { - return JSON.parse(envelope.data); - } catch { - return envelope.data; - } - } - return envelope.data; + records = json.records ?? []; } catch (err) { this.logger.warn("S2 peek last record: parse failed", { err, stream: s2Stream }); - return null; + return false; + } + + // Walk from most-recent backward, skipping S2 command records + // (`headers[0][0] === ""`). The first non-command record is the + // real tail — settled iff its `trigger-control` header is + // `turn-complete`. + for (let i = records.length - 1; i >= 0; i--) { + const record = records[i]!; + if (record.headers?.[0]?.[0] === "") { + continue; + } + const controlValue = headerValue(record.headers, "trigger-control"); + return controlValue === "turn-complete"; } + return false; } async #streamResponseByName( @@ -548,7 +591,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { basins: { exact: this.basin, }, - ops: ["append", "create-stream"], + // S2 treats `trim` as a separate op from `append` even though + // trim records are appended like any other record. Verified + // empirically: without `"trim"` here, `AppendRecord.trim()` + // writes 403 with "Operation not permitted". `chat.agent`'s + // per-turn trim chain depends on this. + ops: ["append", "create-stream", "trim"], streams: { prefix: this.streamPrefix, }, diff --git a/apps/webapp/app/services/realtime/types.ts b/apps/webapp/app/services/realtime/types.ts index 7161f158a48..f09507997a2 100644 --- a/apps/webapp/app/services/realtime/types.ts +++ b/apps/webapp/app/services/realtime/types.ts @@ -2,6 +2,13 @@ export type StreamRecord = { data: string; id: string; seqNum: number; + /** + * S2 record headers, when the underlying backend is the v2 (S2) shape. + * Undefined or empty for run-scoped Redis streams. First-header empty-name + * is an S2 command record (trim/fence); the parser strips those before + * surfacing the record, so callers never see them. + */ + headers?: Array<[string, string]>; }; // Interface for stream ingestion @@ -36,8 +43,10 @@ export type StreamResponseOptions = { /** * Session-stream-only. When `true`, the responder MAY peek the tail * of `.out` and short-circuit to `wait=0` + `X-Session-Settled: true` - * if the last chunk is a terminal marker (e.g. `trigger:turn-complete`). - * Used by `TriggerChatTransport.reconnectToStream` on page reload. + * if the last record is a terminal marker (a `trigger-control` + * `turn-complete` control record, ignoring any trailing S2 trim + * command record). Used by `TriggerChatTransport.reconnectToStream` + * on page reload. * * When absent/false, the responder keeps the unconditional long-poll * behavior — required on the active send-a-message path where the diff --git a/packages/cli-v3/src/mcp/tools/agentChat.ts b/packages/cli-v3/src/mcp/tools/agentChat.ts index 27965c06b39..29c7051e56c 100644 --- a/packages/cli-v3/src/mcp/tools/agentChat.ts +++ b/packages/cli-v3/src/mcp/tools/agentChat.ts @@ -1,5 +1,10 @@ import { z } from "zod"; -import { ApiClient, SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { + ApiClient, + controlSubtype, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import { toolsMetadata } from "../config.js"; import { CommonProjectsInput } from "../schemas.js"; import { respondWithError, toolHandler } from "../utils.js"; @@ -390,50 +395,55 @@ async function collectAgentResponse( session.lastEventId = value.id; } + // Trigger control records (turn-complete, upgrade-required) ride + // on headers — see `client-protocol.mdx#records-on-session-out`. + // Data records carry UIMessageChunks on `value.chunk`. + const controlValue = controlSubtype(value.headers); + + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + break; + } + + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { + // Agent requested upgrade — trigger continuation. Same session, + // new run — reuse sessionId, swap runId. Slim-wire: ship only + // the latest user message as the turn-N delta; prior turns + // come back via snapshot+replay on the new run's boot. + const lastUserMessage = [...session.messages] + .reverse() + .find((m) => m.role === "user"); + const previousRunId = session.runId; + const result = await session.apiClient.triggerTask(session.agentId, { + payload: { + message: lastUserMessage, + chatId: session.chatId, + sessionId: session.sessionId, + trigger: "submit-message", + metadata: session.clientData, + continuation: true, + previousRunId, + }, + options: { + payloadType: "application/json", + tags: [`chat:${session.chatId}`], + }, + }); + session.runId = result.id; + // Keep session.lastEventId pointing at the upgrade-required + // record's seq (set above when the part arrived). The recursive + // subscribe resumes right after that marker, so we don't replay + // the entire session.out stream — which would hit a historical + // turn-complete and break the loop with empty/old text. + reader.releaseLock(); + // Recurse — subscribe to the new run's stream (same session.out URL) + return collectAgentResponse(session, depth + 1); + } + // v2 (session) SSE already parses record.body.data, so `chunk` is // the UIMessageChunk object written by the agent. if (value.chunk != null && typeof value.chunk === "object") { const chunk = value.chunk as Record; - if (chunk.type === "trigger:turn-complete") { - break; - } - - if (chunk.type === "trigger:upgrade-required") { - // Agent requested upgrade — trigger continuation. Same session, - // new run — reuse sessionId, swap runId. Slim-wire: ship only - // the latest user message as the turn-N delta; prior turns - // come back via snapshot+replay on the new run's boot. - const lastUserMessage = [...session.messages] - .reverse() - .find((m) => m.role === "user"); - const previousRunId = session.runId; - const result = await session.apiClient.triggerTask(session.agentId, { - payload: { - message: lastUserMessage, - chatId: session.chatId, - sessionId: session.sessionId, - trigger: "submit-message", - metadata: session.clientData, - continuation: true, - previousRunId, - }, - options: { - payloadType: "application/json", - tags: [`chat:${session.chatId}`], - }, - }); - session.runId = result.id; - // Keep session.lastEventId pointing at the trigger:upgrade-required - // chunk's id (set at line 370 when the chunk arrived). The recursive - // subscribe resumes right after that marker, so we don't replay the - // entire session.out stream — which would hit a historical - // trigger:turn-complete and break the loop with empty/old text. - reader.releaseLock(); - // Recurse — subscribe to the new run's stream (same session.out URL) - return collectAgentResponse(session, depth + 1); - } - if (chunk.type === "text-delta" && typeof chunk.delta === "string") { text += chunk.delta; // Accumulate into a text part diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index b304300f145..032e88ea66c 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -117,6 +117,10 @@ import { RealtimeRunSkipColumns, type SSEStreamPart, } from "./runStream.js"; +import { + controlSubtype, + type ControlEvent, +} from "../sessionStreams/wireProtocol.js"; import { CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, @@ -1308,6 +1312,12 @@ export class ApiClient { onError?: (error: Error) => void; lastEventId?: string; onPart?: (part: SSEStreamPart) => void; + /** + * Fires when a `trigger-control` record arrives on the stream (e.g. + * `turn-complete`, `upgrade-required`). The control record is never + * enqueued into the consumer stream — handle the event here. + */ + onControl?: (event: ControlEvent) => void; } ): Promise> { const url = `${options?.baseUrl ?? this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}`; @@ -1323,13 +1333,29 @@ export class ApiClient { const stream = await subscription.subscribe(); const onPart = options?.onPart; + const onControl = options?.onControl; return stream.pipeThrough( new TransformStream({ - transform(chunk, controller) { - const data = chunk.chunk as T; - onPart?.(chunk as SSEStreamPart); - controller.enqueue(data); + transform(part, controller) { + // Always surface the raw part via onPart so cursor tracking + // (lastSeqNum, lastEventId) stays correct for both data and + // control records. + onPart?.(part as SSEStreamPart); + + // Trigger control record — route to onControl, never enqueue. + const subtype = controlSubtype(part.headers); + if (subtype) { + onControl?.({ + subtype, + headers: part.headers ?? [], + seqNum: Number.parseInt(part.id, 10) || 0, + timestamp: part.timestamp, + }); + return; + } + + controller.enqueue(part.chunk as T); }, }) ); diff --git a/packages/core/src/v3/apiClient/runStream.test.ts b/packages/core/src/v3/apiClient/runStream.test.ts index a91e70c6e56..4ac2880976a 100644 --- a/packages/core/src/v3/apiClient/runStream.test.ts +++ b/packages/core/src/v3/apiClient/runStream.test.ts @@ -442,3 +442,157 @@ describe("SSEStreamSubscription retry behavior", () => { expect(max - min).toBeGreaterThan(2); }); }); + +describe("SSEStreamSubscription v2 batch parsing — record kinds", () => { + const originalFetch = globalThis.fetch; + + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + type ParsedPart = { id: string; chunk: unknown; headers?: ReadonlyArray }; + + // Build a v2 batch SSE response with the given records and close. + function makeBatchResponse( + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }> + ) { + const body = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode(`event: batch\ndata: ${JSON.stringify({ records })}\n\n`) + ); + controller.close(); + }, + }); + return new Response(body, { + status: 200, + headers: { "Content-Type": "text/event-stream", "X-Stream-Version": "v2" }, + }); + } + + async function drain(stream: ReadableStream) { + const reader = stream.getReader(); + const parts: ParsedPart[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) { + reader.releaseLock(); + return parts; + } + parts.push(value as ParsedPart); + } + } + + it("data records flow through with headers and parsed body", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: JSON.stringify({ data: { type: "text-delta", delta: "hi" }, id: "p1" }), + seq_num: 5, + timestamp: 1700000000000, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + expect(parts).toHaveLength(1); + expect(parts[0]!.id).toBe("5"); + expect(parts[0]!.chunk).toEqual({ type: "text-delta", delta: "hi" }); + expect(parts[0]!.headers).toEqual([]); + }); + + it("S2 command records (empty-name header) are filtered out", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: JSON.stringify({ data: { type: "text-delta", delta: "before" }, id: "p1" }), + seq_num: 4, + timestamp: 1700000000000, + headers: [], + }, + // Trim command record — empty-name header, opaque body. + { + body: "AAAAAAAAAAQ=", + seq_num: 5, + timestamp: 1700000000001, + headers: [["", "trim"]], + }, + { + body: JSON.stringify({ data: { type: "text-delta", delta: "after" }, id: "p2" }), + seq_num: 6, + timestamp: 1700000000002, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Trim record stripped — only the two data records survive. + expect(parts).toHaveLength(2); + expect((parts[0]!.chunk as any).delta).toBe("before"); + expect((parts[1]!.chunk as any).delta).toBe("after"); + }); + + it("trigger-control records flow with headers and undefined chunk", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: "", + seq_num: 7, + timestamp: 1700000000003, + headers: [ + ["trigger-control", "turn-complete"], + ["public-access-token", "eyJ..."], + ], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Control record passes through so consumers can route by header, + // but its `chunk` is undefined (empty body). + expect(parts).toHaveLength(1); + expect(parts[0]!.chunk).toBeUndefined(); + expect(parts[0]!.headers).toEqual([ + ["trigger-control", "turn-complete"], + ["public-access-token", "eyJ..."], + ]); + }); + + it("malformed data record body does not crash; cursor still advances", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: "not json at all", + seq_num: 8, + timestamp: 1700000000004, + headers: [], + }, + { + body: JSON.stringify({ data: { type: "text-delta", delta: "x" }, id: "p3" }), + seq_num: 9, + timestamp: 1700000000005, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Malformed record still propagates with undefined chunk (matches + // control-record shape); next data record is fine. + expect(parts).toHaveLength(2); + expect(parts[0]!.chunk).toBeUndefined(); + expect((parts[1]!.chunk as any).delta).toBe("x"); + }); +}); diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 2152c6c69ca..9b263c4d827 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -176,6 +176,15 @@ export type SSEStreamPart = { id: string; chunk: TChunk; timestamp: number; + /** + * S2 record headers, when the underlying transport is the v2 batch shape + * (Session streams). Undefined for v1 streams. Empty array when the record + * had no headers. First-header empty-name is a Trigger control protocol + * marker (see `trigger-control` records on `session.out`); empty-name + * records that S2 itself interprets as command records (trim/fence) are + * filtered out before reaching this struct. + */ + headers?: Array<[string, string]>; }; // Real implementation for production @@ -374,18 +383,47 @@ export class SSEStreamSubscription implements StreamSubscription { } else { if (chunk.event === "batch") { const data = safeParseJSON(chunk.data) as { - records: Array<{ body: string; seq_num: number; timestamp: number }>; + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; + if (!data || !Array.isArray(data.records)) return; for (const record of data.records) { + // Always advance the resume cursor — even for records we + // skip — so a future Last-Event-ID reconnect lands past + // them. this.lastEventId = record.seq_num.toString(); - const parsedBody = safeParseJSON(record.body) as { data: unknown; id: string }; - if (seenIds.has(parsedBody.id)) continue; - seenIds.add(parsedBody.id); + + // S2 command records (trim, fence) have a single header + // with an empty name. They are S2-interpreted directives + // that consume a seq_num but are not application data. + // Skip enqueue; consumers shouldn't see them. + if (record.headers?.[0]?.[0] === "") { + continue; + } + + // Data record (and Trigger control records — see + // `trigger-control` header in `client-protocol.mdx`). + // Control records have an empty body; data records have a + // JSON envelope. `safeParseJSON("")` returns undefined, + // which is what we want for control records — downstream + // consumers route by `headers` and ignore `chunk`. + const parsedBody = safeParseJSON(record.body) as + | { data: unknown; id: string } + | undefined; + if (parsedBody?.id) { + if (seenIds.has(parsedBody.id)) continue; + seenIds.add(parsedBody.id); + } chunkController.enqueue({ id: record.seq_num.toString(), - chunk: parsedBody.data, + chunk: parsedBody?.data, timestamp: record.timestamp, + headers: record.headers ?? [], }); } } diff --git a/packages/core/src/v3/realtime-streams-api.ts b/packages/core/src/v3/realtime-streams-api.ts index e873413e2c3..d9cd9ecfb45 100644 --- a/packages/core/src/v3/realtime-streams-api.ts +++ b/packages/core/src/v3/realtime-streams-api.ts @@ -7,3 +7,9 @@ export const realtimeStreams = RealtimeStreamsAPI.getInstance(); export * from "./realtimeStreams/types.js"; export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js"; export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js"; +export { + trimSessionStream, + writeSessionControlRecord, + writeTurnCompleteRecord, + writeUpgradeRequiredRecord, +} from "./realtimeStreams/sessionStreamOneshot.js"; diff --git a/packages/core/src/v3/realtimeStreams/index.ts b/packages/core/src/v3/realtimeStreams/index.ts index 80c44f5a3db..b1c20735808 100644 --- a/packages/core/src/v3/realtimeStreams/index.ts +++ b/packages/core/src/v3/realtimeStreams/index.ts @@ -11,6 +11,12 @@ import { // into the core package's internals. export { SessionStreamInstance } from "./sessionStreamInstance.js"; export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js"; +export { + trimSessionStream, + writeSessionControlRecord, + writeTurnCompleteRecord, + writeUpgradeRequiredRecord, +} from "./sessionStreamOneshot.js"; const API_NAME = "realtime-streams"; diff --git a/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts b/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts new file mode 100644 index 00000000000..9aa25fa82dd --- /dev/null +++ b/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts @@ -0,0 +1,136 @@ +import { AppendInput, AppendRecord, S2 } from "@s2-dev/streamstore"; +import type { ApiClient } from "../apiClient/index.js"; +import { + TRIGGER_CONTROL_HEADER, + TRIGGER_CONTROL_SUBTYPE, + type TriggerControlSubtype, +} from "../sessionStreams/wireProtocol.js"; +import type { StreamWriteResult } from "./types.js"; + +/** + * One-shot S2 writes against a Session channel. Used for Trigger control + * records (turn-complete, upgrade-required) and S2 command records (trim). + * + * These differ from the streaming writer (`SessionStreamInstance` / + * `StreamsWriterV2`) in two ways: they emit a single record per call, and + * they need precise control over the record's `headers` + `body` shape — + * which the streaming writer's JSON-envelope encoding doesn't expose. + * + * Each call fetches a fresh S2 access token via `initializeSessionStream` + * and opens a new client. Cheap enough at the rate these are emitted + * (~one of each per turn). + */ + +type IO = "out" | "in"; + +async function getS2Stream(apiClient: ApiClient, sessionId: string, io: IO) { + const response = await apiClient.initializeSessionStream(sessionId, io); + const headers = response.headers ?? {}; + const accessToken = headers["x-s2-access-token"]; + const basin = headers["x-s2-basin"]; + const streamName = headers["x-s2-stream-name"]; + const endpoint = headers["x-s2-endpoint"]; + + if (!accessToken || !basin || !streamName) { + throw new Error( + "Session stream initialize did not return S2 credentials — server may be configured for v1 realtime streams, which sessions do not support." + ); + } + + const s2 = new S2({ + accessToken, + ...(endpoint + ? { + endpoints: { + account: endpoint, + basin: endpoint, + }, + } + : {}), + }); + + return s2.basin(basin).stream(streamName); +} + +/** + * Append a single Trigger control record to a Session channel. The record + * carries a `trigger-control` header valued with `subtype`, plus any + * sibling headers (e.g. `public-access-token` on `turn-complete`). Body is + * always empty — control semantics live in the headers. + * + * Returns the ack's last seq_num as `lastEventId`, useful for trim chains. + */ +export async function writeSessionControlRecord( + apiClient: ApiClient, + sessionId: string, + io: IO, + subtype: TriggerControlSubtype | string, + extraHeaders?: ReadonlyArray +): Promise { + const stream = await getS2Stream(apiClient, sessionId, io); + const headers: ReadonlyArray = [ + [TRIGGER_CONTROL_HEADER, subtype], + ...(extraHeaders ?? []), + ]; + const record = AppendRecord.string({ body: "", headers }); + const ack = await stream.append(AppendInput.create([record])); + // S2's `AppendAck.start` is the seq_num of the FIRST record in the batch + // (inclusive); `end` is the seq AFTER the last record (exclusive, equal + // to `tail`). For a single-record append they differ by one — `start` is + // the seq we just wrote, `end` is the next vacant seq. Return `start` + // here so the caller can chain trims against the actual record seq. + return { lastEventId: ack.start.seqNum.toString() }; +} + +/** + * Append an S2 `trim` command record to `session.out`, setting the new + * earliest-readable seq_num. Idempotent and monotonic at S2 — the + * effective trim point is `max(existing, min(provided, current_tail))`. + * + * Used after every `turn-complete` to keep `session.out` bounded to + * approximately one turn of records at steady state. + */ +export async function trimSessionStream( + apiClient: ApiClient, + sessionId: string, + earliestSeqNum: number +): Promise { + const stream = await getS2Stream(apiClient, sessionId, "out"); + await stream.append(AppendInput.create([AppendRecord.trim(earliestSeqNum)])); +} + +/** + * Convenience: append a `turn-complete` control record. Carries an + * optional refreshed `publicAccessToken` in a sibling header. + */ +export async function writeTurnCompleteRecord( + apiClient: ApiClient, + sessionId: string, + publicAccessToken?: string +): Promise { + const extra: ReadonlyArray = publicAccessToken + ? [["public-access-token", publicAccessToken]] + : []; + return writeSessionControlRecord( + apiClient, + sessionId, + "out", + TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE, + extra + ); +} + +/** + * Convenience: append an `upgrade-required` control record. + */ +export async function writeUpgradeRequiredRecord( + apiClient: ApiClient, + sessionId: string +): Promise { + return writeSessionControlRecord( + apiClient, + sessionId, + "out", + TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED + ); +} diff --git a/packages/core/src/v3/session-streams-api.ts b/packages/core/src/v3/session-streams-api.ts index afa417a6418..4f5c979aa3b 100644 --- a/packages/core/src/v3/session-streams-api.ts +++ b/packages/core/src/v3/session-streams-api.ts @@ -5,3 +5,5 @@ import { SessionStreamsAPI } from "./sessionStreams/index.js"; export const sessionStreams = SessionStreamsAPI.getInstance(); export * from "./sessionStreams/types.js"; +export * from "./sessionStreams/wireProtocol.js"; +export * from "./sessionStreams/chatSnapshot.js"; diff --git a/packages/core/src/v3/sessionStreams/chatSnapshot.ts b/packages/core/src/v3/sessionStreams/chatSnapshot.ts new file mode 100644 index 00000000000..8abd266d0f7 --- /dev/null +++ b/packages/core/src/v3/sessionStreams/chatSnapshot.ts @@ -0,0 +1,55 @@ +/** + * Persisted chat-snapshot blob. Written by `chat.agent` to S3 after every + * turn completes (when no `hydrateMessages` hook is registered) and read + * back at the start of the next run to seed the accumulator. Also read by + * the Sessions dashboard to render the full conversation transcript + * without re-streaming `session.out` from `seq_num=0`. + * + * S3 key suffix: `sessions/{sessionId}/snapshot.json`. The webapp's + * presigned-URL service prefixes this with `packets/{projectRef}/{envSlug}/`. + * + * `lastOutEventId` is the S2 seq_num (as a string) of the snapshot's + * final `turn-complete` control record. Used to resume `session.out` + * replay from precisely after the snapshot, and as the trim-chain seed + * for the agent's next turn. + * + * `lastOutTimestamp` is the same record's S2 arrival timestamp (ms since + * epoch). Used as the dedup cutoff for `session.in` on OOM-retry boot. + * + * The `version` field is a forward-compat lever: readers that don't + * recognise a version silently fall back to no-snapshot behaviour. + */ + +import { z } from "zod"; + +import type { UIMessage } from "ai"; + +export type ChatSnapshotV1 = { + version: 1; + savedAt: number; + messages: TUIMessage[]; + lastOutEventId?: string; + lastOutTimestamp?: number; +}; + +/** + * Zod schema for `ChatSnapshotV1` with the message shape kept opaque + * (`unknown[]`). The agent runtime types messages strictly via the + * generic parameter; readers that need stricter validation can layer + * their own UIMessage parser on top. + */ +export const ChatSnapshotV1Schema = z.object({ + version: z.literal(1), + savedAt: z.number(), + messages: z.array(z.unknown()), + lastOutEventId: z.string().optional(), + lastOutTimestamp: z.number().optional(), +}); + +/** + * S3 key suffix for a session's snapshot blob. The webapp's presigned + * URL routes prefix this with `packets/{projectRef}/{envSlug}/`. + */ +export function chatSnapshotKeySuffix(sessionId: string): string { + return `sessions/${sessionId}/snapshot.json`; +} diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index 0463cb3fb71..09d09f730b2 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -7,6 +7,7 @@ import { import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js"; import { SessionChannelIO, SessionStreamManager } from "./types.js"; +import { controlSubtype } from "./wireProtocol.js"; type SessionStreamHandler = (data: unknown) => void | Promise; @@ -361,6 +362,13 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.seqNums.set(key, seqNum); } + // Trigger control records (turn-complete, upgrade-required) + // are dispatched out-of-band via `onControl` — they're not + // consumer-facing data. Skip the data dispatch path. + if (controlSubtype(part.headers)) { + return; + } + // Min-timestamp filter: drop records older than (or at) the // bound. Used to skip already-processed records on OOM-retry // boot. diff --git a/packages/core/src/v3/sessionStreams/wireProtocol.ts b/packages/core/src/v3/sessionStreams/wireProtocol.ts new file mode 100644 index 00000000000..9abf8e35c16 --- /dev/null +++ b/packages/core/src/v3/sessionStreams/wireProtocol.ts @@ -0,0 +1,81 @@ +/** + * Wire-format constants for records on `session.out` / `session.in`. + * + * Three kinds of records can appear on a Session stream: + * + * 1. **Data records** — JSON body shaped as `{data: , id: + * }`, no special headers. The substance of the conversation. + * + * 2. **Trigger control records** — empty body, `headers` carry `[ + * ["trigger-control", ], ...]` plus any subtype-specific sibling + * headers (e.g. `public-access-token` on `turn-complete`). Routed to a + * consumer's `onControl` callback; never surfaced as data chunks. + * + * 3. **S2 command records** — opaque body, `headers` first entry has an + * empty name (only valid for S2-interpreted directives like `trim` and + * `fence`). Filtered out at the SSE parser; consumers never see them. + * + * See `docs/ai-chat/client-protocol.mdx#records-on-session-out` for the + * customer-facing contract. + */ + +/** Header name carrying the Trigger control subtype on control records. */ +export const TRIGGER_CONTROL_HEADER = "trigger-control" as const; + +/** Header name carrying the refreshed `publicAccessToken` on `turn-complete`. */ +export const PUBLIC_ACCESS_TOKEN_HEADER = "public-access-token" as const; + +/** Header name carrying the agent's last S2 event id on a handover bridge. */ +export const SESSION_STATE_LAST_EVENT_ID_HEADER = "last-event-id" as const; + +export const TRIGGER_CONTROL_SUBTYPE = { + TURN_COMPLETE: "turn-complete", + UPGRADE_REQUIRED: "upgrade-required", +} as const; + +export type TriggerControlSubtype = + (typeof TRIGGER_CONTROL_SUBTYPE)[keyof typeof TRIGGER_CONTROL_SUBTYPE]; + +/** Read a single header value by name. Returns the first match. */ +export function headerValue( + headers: ReadonlyArray | undefined, + name: string +): string | undefined { + if (!headers) return undefined; + for (const entry of headers) { + if (entry?.[0] === name) return entry[1]; + } + return undefined; +} + +/** + * Return the Trigger control subtype carried by a record's headers, if any. + * Returns `undefined` for data records and S2 command records. + */ +export function controlSubtype( + headers: ReadonlyArray | undefined +): string | undefined { + return headerValue(headers, TRIGGER_CONTROL_HEADER); +} + +/** + * Is this record an S2 command record? Detected via the empty-name first + * header, which S2 permits only for command records (trim/fence). + */ +export function isS2CommandRecord( + headers: ReadonlyArray | undefined +): boolean { + return headers?.[0]?.[0] === ""; +} + +/** Event payload delivered to a Session-stream `onControl` callback. */ +export type ControlEvent = { + /** Subtype value from the `trigger-control` header (e.g. `turn-complete`). */ + subtype: string; + /** All headers on the underlying record. Read additional metadata here. */ + headers: ReadonlyArray; + /** S2 sequence number of the control record. */ + seqNum: number; + /** S2 arrival timestamp of the control record (ms since epoch). */ + timestamp: number; +}; diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 63a129ae987..070a94bc8d0 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -2,7 +2,9 @@ import { accessoryAttributes, AnyTask, apiClientManager, + controlSubtype, getSchemaParseFn, + headerValue, InputStreamOncePromise, type InputStreamOnceOptions, type InputStreamWaitOptions, @@ -31,6 +33,7 @@ import { type TaskSchema, type TaskRunContext, type TaskWithSchema, + TRIGGER_CONTROL_SUBTYPE, type WriterStreamOptions, } from "@trigger.dev/core/v3"; import type { @@ -42,7 +45,7 @@ import type { UIMessageStreamOptions, LanguageModelUsage, } from "ai"; -import type { StreamWriteResult } from "@trigger.dev/core/v3"; +import type { ChatSnapshotV1, StreamWriteResult } from "@trigger.dev/core/v3"; import { convertToModelMessages, dynamicTool, @@ -132,19 +135,35 @@ const chatTurnContextKey = locals.create("chat.turnContext"); const chatSessionHandleKey = locals.create("chat.sessionHandle"); /** - * Scan `session.out` for the latest `trigger:turn-complete` chunk and + * S2 seq_num of the most recent `turn-complete` control record written by + * this worker. Read by `writeTurnCompleteChunk` to know what to trim back + * to when the next turn finishes, keeping `session.out` bounded to ~one + * turn at steady state. + * + * Seeded at boot from `ChatSnapshotV1.lastOutEventId` (which is exactly + * the previous turn-complete's seq_num). Wrapped in a mutable holder so + * `writeTurnCompleteChunk` can advance it without going through a setter. + * @internal + */ +const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>( + "chat.lastTurnCompleteSeqNum" +); + +/** + * Scan `session.out` for the latest `turn-complete` control record and * return its SSE timestamp. Used at OOM-retry boot to derive a * lower-bound timestamp for the `session.in` filter — records older * than `T_last_complete` belong to turns that already completed on the * prior attempt and are dropped before they reach the turn loop. * - * Implementation is a streaming scan: subscribes via the existing SSE - * endpoint with a short `timeoutInSeconds`, processes each part inline, - * and discards the chunk body so memory stays O(1) regardless of how - * many records are on `session.out`. Bandwidth scales linearly with - * stream length but the scan only fires on retry — a rare event. + * Implementation streams the SSE endpoint and listens for `turn-complete` + * via the transport's `onControl` callback; the data-chunk for-await is + * just there to drive the stream. The scan is naturally O(1 turn) now + * that `session.out` is bounded to roughly one turn at steady state + * (every successful turn-complete is followed by an S2 trim back to the + * previous one — see `writeTurnCompleteChunk`). * - * Returns `undefined` if no `trigger:turn-complete` chunk has been + * Returns `undefined` if no `turn-complete` control record has been * written yet (first-turn OOM, no completed turns to dedup against). * @internal */ @@ -155,23 +174,15 @@ async function findLatestTurnCompleteTimestamp( let latestTs: number | undefined; const stream = await apiClient.subscribeToSessionStream(chatId, "out", { timeoutInSeconds: 1, - onPart: (part) => { - let chunk: unknown = part.chunk; - if (typeof chunk === "string") { - try { - chunk = JSON.parse(chunk); - } catch { - return; - } - } - if (chunk && typeof chunk === "object" && (chunk as { type?: unknown }).type === "trigger:turn-complete") { - latestTs = part.timestamp; + onControl: (event) => { + if (event.subtype === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + latestTs = event.timestamp; } }, }); - // Drain the stream to drive `onPart`. We don't accumulate the chunks — - // each iteration discards the data immediately, so a long session.out - // doesn't blow memory on the retry-boot worker. + // Drain the stream so the underlying SSE reader runs to completion. We + // don't accumulate chunks; `onControl` fires inline as turn-complete + // records arrive. for await (const _ of stream) { // intentionally empty } @@ -182,23 +193,17 @@ async function findLatestTurnCompleteTimestamp( * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the * accumulator with prior conversation state, replacing the old wire-borne - * full-history seed. Only the runtime owns this format — customers never - * touch it. + * full-history seed. * - * `lastOutEventId` is the SSE Last-Event-ID after the snapshot's final - * chunk, used to resume `session.out` replay from precisely after the - * snapshot. `lastOutTimestamp` is the same chunk's timestamp, used to - * skip `findLatestTurnCompleteTimestamp` on OOM retry boot. + * The shape is shared with the Sessions dashboard (which reads the same + * blob to render the full conversation transcript) via + * `@trigger.dev/core/v3`. Customer code shouldn't reach in here — the + * SDK transports surface the messages through the standard `messages` + * accumulator. * * @internal */ -export type ChatSnapshotV1 = { - version: 1; - savedAt: number; - messages: TUIMessage[]; - lastOutEventId?: string; - lastOutTimestamp?: number; -}; +export type { ChatSnapshotV1 } from "@trigger.dev/core/v3"; /** * S3 key suffix for a session's snapshot blob. The webapp's presigned-URL @@ -4570,6 +4575,9 @@ function chatAgent< // `chat.createStartSessionAction` or browser-direct) before this // run is triggered — no client-side upsert needed here. locals.set(chatSessionHandleKey, sessions.open(payload.chatId)); + // Mutable holder; advances in `writeTurnCompleteChunk` after each turn + // and is the trim target for the NEXT turn's trim record. + locals.set(lastTurnCompleteSeqNumKey, { value: undefined }); taskContext.setConversationId(payload.chatId); // Stamp `gen_ai.conversation.id` on the run-level span. Every @@ -4651,6 +4659,20 @@ function chatAgent< }); } + // Seed the trim chain from the snapshot's `lastOutEventId` (the SSE + // id of the previous turn's `turn-complete` control record). The + // first turn-complete this worker writes will then trim back to it. + // Without seeding, the new worker would emit no trim on its first + // turn (chain self-bootstraps from turn 2), so this is purely an + // optimization to keep continuation runs bounded from the first turn. + if (bootSnapshot?.lastOutEventId !== undefined) { + const seeded = Number.parseInt(bootSnapshot.lastOutEventId, 10); + if (Number.isFinite(seeded)) { + const slot = locals.get(lastTurnCompleteSeqNumKey); + if (slot) slot.value = seeded; + } + } + try { replayed = await tracer.startActiveSpan("chat.boot.replay", async () => replaySessionOutTail(sessionIdForSnapshot, { @@ -8657,27 +8679,61 @@ export const chat = { }; /** - * Writes a turn-complete control chunk to the chat output stream. - * The frontend transport intercepts this to close the ReadableStream for the current turn. + * Writes a `turn-complete` control record to the chat output stream and, + * if we have a prior turn-complete's seq_num, appends an S2 `trim` command + * record back to it — keeping `session.out` bounded to roughly one turn + * at steady state. + * + * The control record's body is empty; `trigger-control: turn-complete` + * plus an optional `public-access-token` ride on the headers (see + * `docs/ai-chat/client-protocol.mdx`). SDK transports filter it from the + * consumer chunk stream and surface it via `onControl` / `onTurnComplete`. + * + * Trim is opportunistic and monotonic at S2's layer. A failed trim is + * logged and swallowed; the next turn will retry against a fresher + * target seq_num. + * * @internal */ async function writeTurnCompleteChunk( - chatId?: string, + _chatId?: string, publicAccessToken?: string ): Promise { - const { waitUntilComplete } = chatStream.writer({ - spanName: "turn complete", - collapsed: true, - execute: ({ write }) => { - // Transport-intercepted control chunk — not a valid UIMessageChunk - // type but travels on the same session.out stream. - write({ - type: "trigger:turn-complete", - ...(publicAccessToken ? { publicAccessToken } : {}), - } as unknown as UIMessageChunk); - }, - }); - return await waitUntilComplete(); + const session = getChatSession(); + + // 1. Write the turn-complete control record. The ack's `lastEventId` is + // this record's seq_num — that's the trim target for the NEXT turn. + const extraHeaders: ReadonlyArray = publicAccessToken + ? [["public-access-token", publicAccessToken]] + : []; + const result = await session.out.writeControl( + TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE, + extraHeaders + ); + const T_N = result.lastEventId ? Number.parseInt(result.lastEventId, 10) : undefined; + + // 2. Trim back to the previous turn-complete, if we have one. Skipping on + // first-turn-ever (or first turn post-OOM without a snapshot seed) is + // fine — the chain catches up next turn. + const slot = locals.get(lastTurnCompleteSeqNumKey); + const prev = slot?.value; + if (slot && prev !== undefined) { + try { + await session.out.trimTo(prev); + } catch (err) { + logger.warn("chat.agent: trim failed; will retry next turn", { + error: err instanceof Error ? err.message : String(err), + prev, + }); + } + } + + // 3. Advance the slot so the next turn-complete trims back to this one. + if (slot && T_N !== undefined && Number.isFinite(T_N)) { + slot.value = T_N; + } + + return result; } /** @@ -8725,16 +8781,8 @@ async function writeUpgradeRequiredChunk(): Promise { } } - const { waitUntilComplete } = chatStream.writer({ - spanName: "upgrade required", - collapsed: true, - execute: ({ write }) => { - write({ - type: "trigger:upgrade-required", - } as unknown as UIMessageChunk); - }, - }); - return await waitUntilComplete(); + const session = getChatSession(); + return session.out.writeControl(TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED); } /** diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 639012cdb88..6fc88df782c 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -19,7 +19,13 @@ import type { SessionTriggerConfig, Task } from "@trigger.dev/core/v3"; import type { ModelMessage, UIMessage, UIMessageChunk } from "ai"; import { readUIMessageStream } from "ai"; -import { ApiClient, SSEStreamSubscription, apiClientManager } from "@trigger.dev/core/v3"; +import { + ApiClient, + apiClientManager, + controlSubtype, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; import { sessions } from "./sessions.js"; @@ -710,33 +716,19 @@ export class AgentChat { if (value.id) state.lastEventId = value.id; - // Session records arrive as raw JSON strings (the server - // wraps `{data, id}` on S2). Parse back into objects so - // the control-flow below can inspect chunk.type. - let chunkObj: Record | null = null; - if (value.chunk != null) { - if (typeof value.chunk === "string") { - try { - chunkObj = JSON.parse(value.chunk) as Record; - } catch { - chunkObj = null; - } - } else if (typeof value.chunk === "object") { - chunkObj = value.chunk as Record; - } - } - if (!chunkObj) continue; - - const chunk = chunkObj; + // Trigger control records (turn-complete, upgrade-required) + // route by header — see `client-protocol.mdx`. Their bodies + // are empty; everything substantive is on `value.headers`. + const controlValue = controlSubtype(value.headers); if (state.skipToTurnComplete) { - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { state.skipToTurnComplete = false; } continue; } - if (chunk.type === "trigger:upgrade-required") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { // Server has already triggered the new run via // `end-and-continue`; v2's chunks arrive on the same // S2 stream. Filter the marker for cleanliness and @@ -744,7 +736,7 @@ export class AgentChat { continue; } - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { // Customer's callback may be async (e.g. persisting // lastEventId to a DB). Wrap so a rejected Promise // doesn't surface as an unhandled rejection — that @@ -764,7 +756,11 @@ export class AgentChat { return; } - controller.enqueue(chunk as unknown as UIMessageChunk); + // Data record — `value.chunk` is the parsed UIMessageChunk + // (the SSE parser does the JSON envelope unwrap). Drop + // empty/malformed payloads defensively. + if (value.chunk == null) continue; + controller.enqueue(value.chunk as UIMessageChunk); } } catch (readError) { reader.releaseLock(); diff --git a/packages/trigger-sdk/src/v3/chat-server.ts b/packages/trigger-sdk/src/v3/chat-server.ts index 7ef090a6e63..fe71fe3d86b 100644 --- a/packages/trigger-sdk/src/v3/chat-server.ts +++ b/packages/trigger-sdk/src/v3/chat-server.ts @@ -54,7 +54,12 @@ * helpers like `stepCountIs` / `convertToModelMessages`). */ -import { ApiClient, SessionStreamInstance, apiClientManager } from "@trigger.dev/core/v3"; +import { + ApiClient, + SessionStreamInstance, + TRIGGER_CONTROL_SUBTYPE, + apiClientManager, +} from "@trigger.dev/core/v3"; import { convertToModelMessages, generateId as generateAssistantMessageId, @@ -551,7 +556,17 @@ async function openHandoverSession(opts: { // transport can hydrate `state.lastEventId` for turn 2's // subscribe — without it, turn 2 reads session.out from the // start and replays turn 1 to the user. + // + // The agent's `turn-complete` control record is now header- + // form on S2 (see `client-protocol.mdx`), so the + // `for await (const chunk of agentStream)` loop below NEVER + // sees it as a data chunk — `subscribeToSessionStream` routes + // it to `onControl`. Use that to know when to stop and + // synthesise the data-chunk shape the browser bridge still + // expects (this HTTP response stream is NOT S2 and keeps the + // legacy chunk shape for the customer-server-to-browser hop). let latestEventId: string | undefined; + let turnComplete = false; const agentStream = await apiClient.subscribeToSessionStream( chatId, "out", @@ -563,22 +578,29 @@ async function openHandoverSession(opts: { onPart: (part) => { if (part.id) latestEventId = part.id; }, + onControl: (event) => { + if (event.subtype === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + turnComplete = true; + // Synthesise the data-chunk shape for the browser + // bridge. The customer-server-to-browser response is + // not S2; it keeps the legacy chunk shape so the + // browser's transport can recognise turn-complete the + // same way it always has. + controller.enqueue({ + type: "trigger:turn-complete", + } as unknown as UIMessageChunk); + } + }, } ); for await (const chunk of agentStream) { + // Data records only — control records are routed via + // `onControl` above. Stop reading as soon as we see the + // turn-complete control event (the loop may have one more + // data record buffered, but that's fine — we break out). controller.enqueue(chunk); - // The agent's run-loop emits `trigger:turn-complete` when - // the turn finishes. That's our cue to close — anything - // after is the next turn (which goes via the direct - // `session.in`/`session.out` path, not this endpoint). - if ( - chunk && - typeof chunk === "object" && - (chunk as { type?: unknown }).type === "trigger:turn-complete" - ) { - break; - } + if (turnComplete) break; } // Final control chunk: hand the browser transport the diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index eaa69bed934..6d034e7cb5e 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -19,10 +19,45 @@ import { TriggerChatTransport, createChatTransport } from "./chat.js"; * parse-once, `=== "object"` → use as-is). We pick the object form * here for test simplicity. */ +/** + * Encode test chunks as a session-stream v2 SSE batch event. Each chunk + * becomes one S2 record; chunks of shape `{type: "trigger:turn-complete"}` + * or `{type: "trigger:upgrade-required"}` are translated into header-form + * control records (empty body, `trigger-control` header) to match the + * production wire shape. + */ function sseEncode(chunks: (UIMessageChunk | Record)[]): string { - return chunks - .map((chunk, i) => `id: ${i}\ndata: ${JSON.stringify(chunk)}\n\n`) - .join(""); + let nextSeq = 1; + const records = chunks.map((chunk, i) => { + const partId = `p-${i}`; + const type = (chunk as { type?: unknown }).type; + if (type === "trigger:turn-complete") { + const headers: Array<[string, string]> = [["trigger-control", "turn-complete"]]; + const token = (chunk as { publicAccessToken?: string }).publicAccessToken; + if (token) headers.push(["public-access-token", token]); + return { + body: "", + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers, + }; + } + if (type === "trigger:upgrade-required") { + return { + body: "", + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers: [["trigger-control", "upgrade-required"]], + }; + } + return { + body: JSON.stringify({ data: chunk, id: partId }), + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers: [], + }; + }); + return `event: batch\ndata: ${JSON.stringify({ records })}\n\n`; } function createSSEStream(sseText: string): ReadableStream { @@ -127,7 +162,13 @@ function defaultSseResponse( ): Response { return new Response(createSSEStream(sseEncode(chunks)), { status: 200, - headers: { "content-type": "text/event-stream" }, + headers: { + "content-type": "text/event-stream", + // Session streams are always v2 in production — batch format + // with one S2 record per SSE event. The legacy v1 path is for + // run-scoped Redis streams. + "X-Stream-Version": "v2", + }, }); } diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 980d34c1f04..5b30d56ca45 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -24,7 +24,14 @@ */ import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from "ai"; -import { ApiClient, SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { + ApiClient, + controlSubtype, + headerValue, + PUBLIC_ACCESS_TOKEN_HEADER, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import { ChatTabCoordinator } from "./chat-tab-coordinator.js"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; @@ -980,10 +987,12 @@ export class TriggerChatTransport implements ChatTransport { /** * Open an SSE subscription to the session's `.out` stream and pipe - * UIMessageChunks through to the AI SDK. Filters control chunks - * (`trigger:turn-complete`, `trigger:upgrade-required`) — the latter - * is purely telemetry now since the server handles the run swap - * inline (see `end-and-continue`). + * UIMessageChunks through to the AI SDK. Trigger control records + * (`turn-complete`, `upgrade-required` — see `trigger-control` header + * on `client-protocol.mdx#records-on-session-out`) are routed by + * header and never reach the consumer. `upgrade-required` is purely + * telemetry now since the server handles the run swap inline (see + * `end-and-continue`). */ private subscribeToSessionStream( state: ChatSessionState, @@ -1144,7 +1153,12 @@ export class TriggerChatTransport implements ChatTransport { } while (true) { - let value: { id: string; chunk: unknown; timestamp: number }; + let value: { + id: string; + chunk: unknown; + timestamp: number; + headers?: ReadonlyArray; + }; if (primed !== undefined) { value = primed; primed = undefined; @@ -1166,32 +1180,20 @@ export class TriggerChatTransport implements ChatTransport { if (value.id) state.lastEventId = value.id; - // Session SSE delivers raw record bodies as strings (the - // server wraps them in `{data, id}` for S2). Parse so the - // rest of the loop can treat chunks as objects. - let chunkObj: Record | null = null; - if (value.chunk != null) { - if (typeof value.chunk === "string") { - try { - chunkObj = JSON.parse(value.chunk) as Record; - } catch { - chunkObj = null; - } - } else if (typeof value.chunk === "object") { - chunkObj = value.chunk as Record; - } - } - if (!chunkObj) continue; - const chunk = chunkObj; + // Trigger control record (turn-complete, upgrade-required) — + // routed by header, body is empty. Detect via the + // `trigger-control` header on the SSE record. Data records + // (UIMessageChunks) fall through to the chunk path below. + const controlValue = controlSubtype(value.headers); if (state.skipToTurnComplete) { - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { state.skipToTurnComplete = false; } continue; } - if (chunk.type === "trigger:upgrade-required") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { // Server has already triggered the new run via // `end-and-continue`; the next chunks on this same `.out` // stream come from v2. Filter the marker for cleanliness @@ -1199,9 +1201,10 @@ export class TriggerChatTransport implements ChatTransport { continue; } - if (chunk.type === "trigger:turn-complete") { - if (typeof chunk.publicAccessToken === "string") { - state.publicAccessToken = chunk.publicAccessToken; + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + const refreshedToken = headerValue(value.headers, PUBLIC_ACCESS_TOKEN_HEADER); + if (refreshedToken) { + state.publicAccessToken = refreshedToken; } state.isStreaming = false; this.notifySessionChange(chatId, state); @@ -1221,7 +1224,11 @@ export class TriggerChatTransport implements ChatTransport { return; } - controller.enqueue(chunk as unknown as UIMessageChunk); + // Data record — `value.chunk` is the parsed UIMessageChunk + // unwrapped from the S2 record envelope (the parser does the + // JSON unwrap). Drop empty/malformed payloads defensively. + if (value.chunk == null) continue; + controller.enqueue(value.chunk as UIMessageChunk); } } catch (error) { if (error instanceof Error && error.name === "AbortError") { diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index 3763a27146a..159874a9bf1 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -31,7 +31,10 @@ import { runtime, sessionStreams, taskContext, + trimSessionStream, + writeSessionControlRecord, } from "@trigger.dev/core/v3"; +import type { StreamWriteResult } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { SpanStatusCode } from "@opentelemetry/api"; import { tracer } from "./tracer.js"; @@ -464,6 +467,38 @@ export class SessionOutputChannel { throw error; } } + + /** + * Write a single Trigger control record to `.out`. The record carries a + * `trigger-control` header valued with `subtype` plus any sibling + * `extraHeaders`; the body is empty. Control records are filtered out of + * the consumer-facing chunk stream by the SDK transport — readers route + * them via the `onControl` callback instead. + * + * The returned `lastEventId` is the S2 seq_num of the written record, + * useful for trim chains (e.g. trim back to the previous turn-complete). + */ + async writeControl( + subtype: string, + extraHeaders?: ReadonlyArray + ): Promise { + const apiClient = apiClientManager.clientOrThrow(); + return writeSessionControlRecord(apiClient, this.sessionId, "out", subtype, extraHeaders); + } + + /** + * Append an S2 `trim` command record to `.out`. Records with seq_num + * less than `earliestSeqNum` are eventually removed from the stream. + * + * Idempotent and monotonic at S2's layer (`max(existing, min(provided, + * current_tail))`) — backward trims are silently no-ops for deletion + * but still consume a seq_num. Used by `chat.agent`'s turn loop to + * keep `session.out` bounded to roughly one turn at steady state. + */ + async trimTo(earliestSeqNum: number): Promise { + const apiClient = apiClientManager.clientOrThrow(); + await trimSessionStream(apiClient, this.sessionId, earliestSeqNum); + } } /** diff --git a/packages/trigger-sdk/src/v3/test/test-session-handle.ts b/packages/trigger-sdk/src/v3/test/test-session-handle.ts index 71bc9d8d7b3..93d6146ddd5 100644 --- a/packages/trigger-sdk/src/v3/test/test-session-handle.ts +++ b/packages/trigger-sdk/src/v3/test/test-session-handle.ts @@ -242,6 +242,39 @@ export class TestSessionOutputChannel extends SessionOutputChannel { "inspect `harness.allChunks` / `harness.allRawChunks` instead." ); } + + /** + * Override the one-shot control-record path. In production this goes + * direct to S2 with header-form records; in tests we project it back + * into the chunk-shape the harness already understands (the listener + * watches for `{type: "trigger:turn-complete"}` to drive turn-complete + * latches). Returns an empty `StreamWriteResult` — tests don't observe + * the seq_num, and trim seeding only matters in production. + */ + async writeControl( + subtype: string, + extraHeaders?: ReadonlyArray + ): Promise { + const synthetic: Record = { type: `trigger:${subtype}` }; + if (extraHeaders) { + for (const [name, value] of extraHeaders) { + if (name === "public-access-token") { + synthetic.publicAccessToken = value; + } + } + } + notify(this.state, synthetic); + return {}; + } + + /** + * No-op in the mock harness. Production trims keep `session.out` bounded; + * the in-memory `state.chunks` array doesn't need trimming and tests + * that care about trim behaviour exercise it via the real S2 code path. + */ + async trimTo(_earliestSeqNum: number): Promise { + // Intentionally a no-op for the mock harness. + } } /** From 20fb74aa18a2b4bb0f6b4ff5e0943069e01927d0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 09:04:15 +0100 Subject: [PATCH 2/4] fix(core,sdk): resume session.in via seq cursor on turn-complete header Replaces the wall-clock dedup cutoff on session.in with a seq-based resume cursor carried on each turn-complete control record as a `session-in-event-id` sibling header. On worker boot the cursor is read off .out's latest turn-complete and seeds .in's SSE subscription so already-processed user messages aren't replayed from S2. The timestamp model had a narrow race: the snapshot's lastOutTimestamp was Date.now() sampled after turn-complete landed on S2, so a user message arriving between turn-complete and snapshot-write could be silently filtered as "old" on the next worker boot. Seq numbers are unique, total-ordered, and stamped by S2, so the same cutoff now falls out without any clock involved. The sessionStreams manager grows a second cursor (lastDispatchedSeqNum) distinct from the existing received cursor. It advances only when a record is delivered to a once()/wait() consumer or shifted off the buffer into one. The agent persists it on turn-complete and seeds it back at the next worker's boot before any .in SSE subscribe opens. lastOutTimestamp is gone from the snapshot; the rescan is O(1 turn) since .out is bounded. Also bundles four small PR-review fixes: - runStream SSEStreamPart doc had the trigger-control vs S2-command-record marker reversed - agentChat removed a redundant reader.releaseLock() before a recursive return that the finally block already covers - sessions surfaces onControl on SessionSubscribeOptions so consumers of the raw .out.read() API see header-form control records - chatSnapshot drops the now-unused lastOutTimestamp field --- packages/cli-v3/src/mcp/tools/agentChat.ts | 5 +- packages/core/src/v3/apiClient/runStream.ts | 8 +- .../src/v3/sessionStreams/chatSnapshot.ts | 5 - packages/core/src/v3/sessionStreams/index.ts | 12 ++ .../core/src/v3/sessionStreams/manager.ts | 71 ++++++++++- .../core/src/v3/sessionStreams/noopManager.ts | 10 ++ packages/core/src/v3/sessionStreams/types.ts | 22 ++++ .../src/v3/sessionStreams/wireProtocol.ts | 12 ++ .../v3/test/test-session-stream-manager.ts | 15 +++ packages/trigger-sdk/src/v3/ai.ts | 112 ++++++++++-------- packages/trigger-sdk/src/v3/sessions.ts | 24 +++- 11 files changed, 229 insertions(+), 67 deletions(-) diff --git a/packages/cli-v3/src/mcp/tools/agentChat.ts b/packages/cli-v3/src/mcp/tools/agentChat.ts index 29c7051e56c..078df1c46b1 100644 --- a/packages/cli-v3/src/mcp/tools/agentChat.ts +++ b/packages/cli-v3/src/mcp/tools/agentChat.ts @@ -433,9 +433,8 @@ async function collectAgentResponse( // record's seq (set above when the part arrived). The recursive // subscribe resumes right after that marker, so we don't replay // the entire session.out stream — which would hit a historical - // turn-complete and break the loop with empty/old text. - reader.releaseLock(); - // Recurse — subscribe to the new run's stream (same session.out URL) + // turn-complete and break the loop with empty/old text. The outer + // `finally` block releases the reader before the recursion runs. return collectAgentResponse(session, depth + 1); } diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 9b263c4d827..217b7a51082 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -179,10 +179,10 @@ export type SSEStreamPart = { /** * S2 record headers, when the underlying transport is the v2 batch shape * (Session streams). Undefined for v1 streams. Empty array when the record - * had no headers. First-header empty-name is a Trigger control protocol - * marker (see `trigger-control` records on `session.out`); empty-name - * records that S2 itself interprets as command records (trim/fence) are - * filtered out before reaching this struct. + * had no headers. Trigger control records carry a `trigger-control` named + * header (see `trigger-control` records on `session.out`) and may reach + * this struct. S2 command records (trim/fence) are identified by an + * empty-name first header and are filtered out before enqueue. */ headers?: Array<[string, string]>; }; diff --git a/packages/core/src/v3/sessionStreams/chatSnapshot.ts b/packages/core/src/v3/sessionStreams/chatSnapshot.ts index 8abd266d0f7..f30ea47b46b 100644 --- a/packages/core/src/v3/sessionStreams/chatSnapshot.ts +++ b/packages/core/src/v3/sessionStreams/chatSnapshot.ts @@ -13,9 +13,6 @@ * replay from precisely after the snapshot, and as the trim-chain seed * for the agent's next turn. * - * `lastOutTimestamp` is the same record's S2 arrival timestamp (ms since - * epoch). Used as the dedup cutoff for `session.in` on OOM-retry boot. - * * The `version` field is a forward-compat lever: readers that don't * recognise a version silently fall back to no-snapshot behaviour. */ @@ -29,7 +26,6 @@ export type ChatSnapshotV1 = { savedAt: number; messages: TUIMessage[]; lastOutEventId?: string; - lastOutTimestamp?: number; }; /** @@ -43,7 +39,6 @@ export const ChatSnapshotV1Schema = z.object({ savedAt: z.number(), messages: z.array(z.unknown()), lastOutEventId: z.string().optional(), - lastOutTimestamp: z.number().optional(), }); /** diff --git a/packages/core/src/v3/sessionStreams/index.ts b/packages/core/src/v3/sessionStreams/index.ts index 75b372c8314..7ea17261632 100644 --- a/packages/core/src/v3/sessionStreams/index.ts +++ b/packages/core/src/v3/sessionStreams/index.ts @@ -59,6 +59,18 @@ export class SessionStreamsAPI implements SessionStreamManager { this.#getManager().setLastSeqNum(sessionId, io, seqNum); } + public lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined { + return this.#getManager().lastDispatchedSeqNum(sessionId, io); + } + + public setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void { + this.#getManager().setLastDispatchedSeqNum(sessionId, io, seqNum); + } + public setMinTimestamp( sessionId: string, io: SessionChannelIO, diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index 09d09f730b2..b6cd6a740de 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -44,6 +44,12 @@ export class StandardSessionStreamManager implements SessionStreamManager { private handlers = new Map>(); private onceWaiters = new Map(); private buffer = new Map(); + // Parallel to `buffer`: the SSE seq_num of each buffered record. Same + // length and order as `buffer[key]`. Used so that when `once()` shifts + // a buffered record into a waiter, the cursor (`lastDispatchedSeqNums`) + // can advance to that record's seq. Kept as a separate map so the + // existing `peek()` shape (returns `unknown`) stays unchanged. + private bufferSeqNums = new Map(); private tails = new Map(); // Per-stream lower-bound timestamp filter. When set, records whose // SSE timestamp is <= the bound are dropped before dispatch — used by @@ -59,6 +65,15 @@ export class StandardSessionStreamManager implements SessionStreamManager { // that's already being delivered out-of-band via the waitpoint. private explicitlyDisconnected = new Set(); private seqNums = new Map(); + // Highest seq_num that has been *consumed* (delivered to a once() + // waiter or shifted off the buffer into a once() caller) on a channel. + // Distinct from `seqNums`, which advances whenever any record is + // received from SSE — even ones still sitting in the local buffer. + // The committed-consume cursor is what gets persisted on the + // turn-complete control record's `session-in-event-id` header so the + // next worker boot can resume `.in` from this point without + // re-delivering already-handled user messages. + private lastDispatchedSeqNums = new Map(); // Reconnect attempt counter per key. Drives the exponential backoff // applied by `#ensureTailConnected`'s `.finally` so a persistent // backend failure (auth rejection, 5xx, DNS, etc.) doesn't reconnect @@ -123,8 +138,14 @@ export class StandardSessionStreamManager implements SessionStreamManager { const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { const data = buffered.shift()!; + const seqList = this.bufferSeqNums.get(key); + const shiftedSeqNum = seqList?.shift(); if (buffered.length === 0) { this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } + if (shiftedSeqNum !== undefined) { + this.#advanceLastDispatched(key, shiftedSeqNum); } return new InputStreamOncePromise((resolve) => { resolve({ ok: true, output: data }); @@ -186,6 +207,25 @@ export class StandardSessionStreamManager implements SessionStreamManager { } } + lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined { + return this.lastDispatchedSeqNums.get(keyFor(sessionId, io)); + } + + setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void { + this.#advanceLastDispatched(keyFor(sessionId, io), seqNum); + } + + #advanceLastDispatched(key: string, seqNum: number): void { + const current = this.lastDispatchedSeqNums.get(key); + if (current === undefined || seqNum > current) { + this.lastDispatchedSeqNums.set(key, seqNum); + } + } + setMinTimestamp( sessionId: string, io: SessionChannelIO, @@ -204,7 +244,15 @@ export class StandardSessionStreamManager implements SessionStreamManager { const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { buffered.shift(); - if (buffered.length === 0) this.buffer.delete(key); + const seqList = this.bufferSeqNums.get(key); + const shiftedSeqNum = seqList?.shift(); + if (buffered.length === 0) { + this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } + if (shiftedSeqNum !== undefined) { + this.#advanceLastDispatched(key, shiftedSeqNum); + } return true; } return false; @@ -224,6 +272,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.tails.delete(key); } this.buffer.delete(key); + this.bufferSeqNums.delete(key); // Reset the backoff counter so a future re-attach starts fresh — // an explicit disconnect is a deliberate teardown, not evidence of // a broken backend. @@ -261,6 +310,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { reset(): void { this.disconnect(); this.seqNums.clear(); + this.lastDispatchedSeqNums.clear(); this.minTimestamps.clear(); this.handlers.clear(); this.reconnectAttempts.clear(); @@ -276,6 +326,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { } this.onceWaiters.clear(); this.buffer.clear(); + this.bufferSeqNums.clear(); } #ensureTailConnected(sessionId: string, io: SessionChannelIO): void { @@ -385,7 +436,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { // keep as string } } - this.#dispatch(key, data); + this.#dispatch(key, data, Number.isFinite(seqNum) ? seqNum : undefined); }, onComplete: () => { if (this.debug) { @@ -410,7 +461,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { } } - #dispatch(key: string, data: unknown): void { + #dispatch(key: string, data: unknown, seqNum: number | undefined): void { // Any record flowing through = healthy connection; reset the backoff // counter so the next disconnect starts fresh. this.reconnectAttempts.delete(key); @@ -423,6 +474,12 @@ export class StandardSessionStreamManager implements SessionStreamManager { if (waiter.signal && waiter.abortHandler) { waiter.signal.removeEventListener("abort", waiter.abortHandler); } + // Record was consumed directly by a waiter — advance the + // committed-consume cursor immediately. Buffered-then-shifted + // records advance the cursor in `once()` / `shiftBuffer()`. + if (seqNum !== undefined) { + this.#advanceLastDispatched(key, seqNum); + } waiter.resolve({ ok: true, output: data }); this.#invokeHandlers(key, data); return; @@ -442,6 +499,14 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.buffer.set(key, buffered); } buffered.push(data); + if (seqNum !== undefined) { + let bufferedSeqs = this.bufferSeqNums.get(key); + if (!bufferedSeqs) { + bufferedSeqs = []; + this.bufferSeqNums.set(key, bufferedSeqs); + } + bufferedSeqs.push(seqNum); + } } #invokeHandlers(key: string, data: unknown): void { diff --git a/packages/core/src/v3/sessionStreams/noopManager.ts b/packages/core/src/v3/sessionStreams/noopManager.ts index c1c3c38dcdf..42d97c9d4ea 100644 --- a/packages/core/src/v3/sessionStreams/noopManager.ts +++ b/packages/core/src/v3/sessionStreams/noopManager.ts @@ -31,6 +31,16 @@ export class NoopSessionStreamManager implements SessionStreamManager { setLastSeqNum(_sessionId: string, _io: SessionChannelIO, _seqNum: number): void {} + lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined { + return undefined; + } + + setLastDispatchedSeqNum( + _sessionId: string, + _io: SessionChannelIO, + _seqNum: number + ): void {} + setMinTimestamp( _sessionId: string, _io: SessionChannelIO, diff --git a/packages/core/src/v3/sessionStreams/types.ts b/packages/core/src/v3/sessionStreams/types.ts index 2310fabae25..f59d3ee9c7b 100644 --- a/packages/core/src/v3/sessionStreams/types.ts +++ b/packages/core/src/v3/sessionStreams/types.ts @@ -45,6 +45,28 @@ export interface SessionStreamManager { /** Advance the last-seen sequence number (prevents SSE replay after `.wait` resume). */ setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void; + /** + * Highest sequence number that has been *consumed* on the channel — + * delivered to a `once()` waiter or shifted off the buffer into one. + * Distinct from {@link lastSeqNum}, which advances on every received + * record regardless of whether anything consumed it. Used by + * `chat.agent` to persist the `.in` resume cursor on each + * `turn-complete` control record so the next worker boot can resume + * the channel from this point without replaying processed messages. + */ + lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined; + + /** + * Seed the committed-consume cursor at worker boot — e.g. from the + * `session-in-event-id` header on the latest `turn-complete` on + * `.out`. Monotonic: only ever advances forward, never backwards. + */ + setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void; + /** * Set a per-stream lower-bound SSE timestamp. Records whose timestamp * is `<= minTimestamp` are dropped before dispatch. Used by chat.agent diff --git a/packages/core/src/v3/sessionStreams/wireProtocol.ts b/packages/core/src/v3/sessionStreams/wireProtocol.ts index 9abf8e35c16..550e81a0af4 100644 --- a/packages/core/src/v3/sessionStreams/wireProtocol.ts +++ b/packages/core/src/v3/sessionStreams/wireProtocol.ts @@ -28,6 +28,18 @@ export const PUBLIC_ACCESS_TOKEN_HEADER = "public-access-token" as const; /** Header name carrying the agent's last S2 event id on a handover bridge. */ export const SESSION_STATE_LAST_EVENT_ID_HEADER = "last-event-id" as const; +/** + * Header on `turn-complete` records carrying the highest `session.in` + * seq_num the agent committed to processing during this turn. Read on + * the next worker boot to seed `.in`'s resume cursor — anything past + * this seq is new and gets delivered; anything at-or-before was already + * processed and is skipped. Decimal-string form of the seq_num. + * + * Omitted when no `.in` records have been consumed yet (first turn of a + * fresh chat triggered via the wire payload). + */ +export const SESSION_IN_EVENT_ID_HEADER = "session-in-event-id" as const; + export const TRIGGER_CONTROL_SUBTYPE = { TURN_COMPLETE: "turn-complete", UPGRADE_REQUIRED: "upgrade-required", diff --git a/packages/core/src/v3/test/test-session-stream-manager.ts b/packages/core/src/v3/test/test-session-stream-manager.ts index 493093d686f..a688790d616 100644 --- a/packages/core/src/v3/test/test-session-stream-manager.ts +++ b/packages/core/src/v3/test/test-session-stream-manager.ts @@ -145,6 +145,21 @@ export class TestSessionStreamManager implements SessionStreamManager { this.seqNums.set(keyFor(sessionId, io), seqNum); } + lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined { + // The test harness drives records via `__sendFromTest` without seq + // numbers, so the committed-consume cursor stays undefined. Tests + // that need cursor behaviour exercise it via the real manager. + return undefined; + } + + setLastDispatchedSeqNum( + _sessionId: string, + _io: SessionChannelIO, + _seqNum: number + ): void { + // no-op — see comment on `lastDispatchedSeqNum`. + } + setMinTimestamp( _sessionId: string, _io: SessionChannelIO, diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 070a94bc8d0..19f21aaa2c9 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -33,6 +33,7 @@ import { type TaskSchema, type TaskRunContext, type TaskWithSchema, + SESSION_IN_EVENT_ID_HEADER, TRIGGER_CONTROL_SUBTYPE, type WriterStreamOptions, } from "@trigger.dev/core/v3"; @@ -151,33 +152,39 @@ const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>( /** * Scan `session.out` for the latest `turn-complete` control record and - * return its SSE timestamp. Used at OOM-retry boot to derive a - * lower-bound timestamp for the `session.in` filter — records older - * than `T_last_complete` belong to turns that already completed on the - * prior attempt and are dropped before they reach the turn loop. + * return its `session-in-event-id` header value — the committed-consume + * cursor on `.in` as of that turn-complete. Used at worker boot to seed + * the `.in` subscription so already-processed user messages don't get + * replayed from S2. * * Implementation streams the SSE endpoint and listens for `turn-complete` * via the transport's `onControl` callback; the data-chunk for-await is - * just there to drive the stream. The scan is naturally O(1 turn) now - * that `session.out` is bounded to roughly one turn at steady state - * (every successful turn-complete is followed by an S2 trim back to the - * previous one — see `writeTurnCompleteChunk`). - * - * Returns `undefined` if no `turn-complete` control record has been - * written yet (first-turn OOM, no completed turns to dedup against). + * just there to drive the stream. The scan is O(1 turn) because + * `session.out` is bounded to roughly one turn at steady state — every + * successful turn-complete is followed by an S2 trim back to the + * previous one (see `writeTurnCompleteChunk`). + * + * Returns `undefined` if no `turn-complete` carrying the header has been + * written yet — first-turn-ever, first turn post-OOM-with-no-prior-runs, + * or a `turn-complete` written before this header existed (cross-version + * boot). Callers fall back to subscribing `.in` from seq 0 in that case; + * the slim-wire merge handles any dedup against snapshot-restored + * messages. * @internal */ -async function findLatestTurnCompleteTimestamp( +async function findLatestSessionInCursor( chatId: string ): Promise { const apiClient = apiClientManager.clientOrThrow(); - let latestTs: number | undefined; + let latestCursor: number | undefined; const stream = await apiClient.subscribeToSessionStream(chatId, "out", { timeoutInSeconds: 1, onControl: (event) => { - if (event.subtype === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { - latestTs = event.timestamp; - } + if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return; + const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER); + if (!raw) return; + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed)) latestCursor = parsed; }, }); // Drain the stream so the underlying SSE reader runs to completion. We @@ -186,7 +193,7 @@ async function findLatestTurnCompleteTimestamp( for await (const _ of stream) { // intentionally empty } - return latestTs; + return latestCursor; } /** @@ -4687,47 +4694,46 @@ function chatAgent< } } - // ── session.in dedup cutoff ──────────────────────────────────── + // ── session.in resume cursor ─────────────────────────────────── // // A fresh worker subscribes to `session.in` from seq 0 and would // re-deliver every record ever appended — including user messages - // from turns already completed on a prior run. Without dedup, the - // loop would re-process them as fresh turns and the slim-wire merge - // would replace-by-id against the snapshot-restored copies, yielding - // no-op replaces while the customer's actual new message waits in - // the queue. + // from turns already completed on a prior run. Without a cursor, + // the loop would re-process them as fresh turns and the slim-wire + // merge would replace-by-id against snapshot-restored copies, + // yielding no-op replaces while the customer's actual new message + // waits in the queue. // - // The cutoff is the timestamp of the last `trigger:turn-complete` - // chunk on `session.out`. When we have a snapshot, that timestamp is - // already in `lastOutTimestamp` — use it directly to skip the - // O(stream-length) scan. Fall back to the scan only when no snapshot - // is available (first-ever OOM retry, or `hydrateMessages` - // short-circuited the snapshot read). + // The cursor is the seq_num of the last `.in` record the prior + // worker committed to processing, persisted on each `turn-complete` + // control record as a `session-in-event-id` sibling header. The + // boot scan reads the header off `.out`'s latest turn-complete and + // seeds the manager so the upcoming `.in` SSE subscribe opens with + // `Last-Event-ID: ` — S2 starts after that seq and old + // messages never reach this worker. // - // Applies in three cases (any of which means session.in has records + // Applies in three cases (any of which means `.in` has records // belonging to completed turns the new run should skip): // - OOM retry (`ctx.attempt.number > 1`) // - Continuation run (`payload.continuation === true`) — prior run // crashed / was canceled / requested upgrade // - Snapshot exists at all (catches edge cases where the wire // didn't set `continuation` but a snapshot indicates prior turns) - const needsDedupCutoff = + const needsResumeCursor = ctx.attempt.number > 1 || payload.continuation === true || bootSnapshot !== undefined; - if (needsDedupCutoff) { + if (needsResumeCursor) { try { - let cutoff = bootSnapshot?.lastOutTimestamp; - if (cutoff === undefined) { - cutoff = await findLatestTurnCompleteTimestamp(payload.chatId); - } - if (cutoff !== undefined) { - sessionStreams.setMinTimestamp(payload.chatId, "in", cutoff); + const cursor = await findLatestSessionInCursor(payload.chatId); + if (cursor !== undefined) { + sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); } } catch (error) { logger.warn( - "chat.agent: session.in dedup cutoff lookup failed; old messages may replay", + "chat.agent: session.in resume cursor lookup failed; old messages may replay", { error: error instanceof Error ? error.message : String(error) } ); } @@ -6431,16 +6437,7 @@ function chatAgent< version: 1, savedAt: Date.now(), messages: accumulatedUIMessages, - // `StreamWriteResult` exposes `lastEventId` only; - // use the snapshot save time as the - // `lastOutTimestamp` cutoff hint. The OOM-retry - // optimization compares this to SSE chunk - // timestamps (ms epoch on the server) — Date.now() - // here is the closest cheap approximation - // available client-side and is consistent with - // the existing turn-complete chunk emission. lastOutEventId: turnCompleteResult?.lastEventId, - lastOutTimestamp: Date.now(), }); }, { @@ -8703,9 +8700,22 @@ async function writeTurnCompleteChunk( // 1. Write the turn-complete control record. The ack's `lastEventId` is // this record's seq_num — that's the trim target for the NEXT turn. - const extraHeaders: ReadonlyArray = publicAccessToken - ? [["public-access-token", publicAccessToken]] - : []; + // + // Sibling headers: + // - `public-access-token` (optional): refresh token surfaced to + // browser-side transports via `onTurnComplete`. + // - `session-in-event-id` (optional): the committed-consume cursor + // on `.in` as of this turn-complete. On next worker boot, the + // boot scan reads this back and seeds the `.in` subscription so + // already-processed user messages aren't re-delivered. + const extraHeaders: Array<[string, string]> = []; + if (publicAccessToken) { + extraHeaders.push(["public-access-token", publicAccessToken]); + } + const inCursor = session.in.lastDispatchedSeqNum(); + if (inCursor !== undefined) { + extraHeaders.push([SESSION_IN_EVENT_ID_HEADER, String(inCursor)]); + } const result = await session.out.writeControl( TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE, extraHeaders diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index 159874a9bf1..663dbbebc30 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -34,7 +34,7 @@ import { trimSessionStream, writeSessionControlRecord, } from "@trigger.dev/core/v3"; -import type { StreamWriteResult } from "@trigger.dev/core/v3"; +import type { ControlEvent, StreamWriteResult } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { SpanStatusCode } from "@opentelemetry/api"; import { tracer } from "./tracer.js"; @@ -393,6 +393,7 @@ export class SessionOutputChannel { lastEventId: options?.lastEventId != null ? String(options.lastEventId) : undefined, onPart: options?.onPart, + onControl: options?.onControl, onComplete: options?.onComplete, onError: options?.onError, }); @@ -592,6 +593,20 @@ export class SessionInputChannel { return sessionStreams.peek(this.sessionId, "in") as T | undefined; } + /** + * The highest S2 sequence number of any record this channel has + * delivered to a `once()` / `wait()` consumer (or had shifted off its + * buffer into one). Distinct from "last received" — buffered-but-not- + * yet-consumed records don't count. + * + * Used by `chat.agent` to persist the `.in` resume cursor on each + * `turn-complete` control record, so the next worker boot can subscribe + * past already-processed user messages. + */ + lastDispatchedSeqNum(): number | undefined { + return sessionStreams.lastDispatchedSeqNum(this.sessionId, "in"); + } + /** * Suspend the current run until the next record arrives on `.in`. * Unlike {@link once}, `wait()` frees compute while blocked — the @@ -762,6 +777,13 @@ export type SessionSubscribeOptions = { timeoutInSeconds?: number; /** Called for each SSE event with the full event metadata (id, timestamp). */ onPart?: (part: { id: string; chunk: T; timestamp: number }) => void; + /** + * Called when a `trigger-control` record arrives on the stream (e.g. + * `turn-complete`, `upgrade-required`). Control records are filtered + * out of the consumer chunk stream — handle them here. See + * `docs/ai-chat/client-protocol.mdx` for the wire shape. + */ + onControl?: (event: ControlEvent) => void; /** Called when the server signals end-of-stream. */ onComplete?: () => void; /** Called on unrecoverable errors after the retry budget is exhausted. */ From ffc52b42c8bf9154731201eba6023e5e0d053a62 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 09:14:51 +0100 Subject: [PATCH 3/4] fix(sdk,webapp): drop stale lastOutTimestamp from snapshot test fixtures vitest was happy with the extra property at runtime so the local suite passed, but tsc on the typecheck job (strict on object literals) caught three test fixtures still setting the field after it was removed from ChatSnapshotV1 in the resume-cursor commit. --- apps/webapp/test/chat-snapshot-integration.test.ts | 1 - apps/webapp/test/replay-after-crash.test.ts | 1 - packages/trigger-sdk/test/chat-snapshot.test.ts | 1 - 3 files changed, 3 deletions(-) diff --git a/apps/webapp/test/chat-snapshot-integration.test.ts b/apps/webapp/test/chat-snapshot-integration.test.ts index 3d157d58f9f..1d500e16b90 100644 --- a/apps/webapp/test/chat-snapshot-integration.test.ts +++ b/apps/webapp/test/chat-snapshot-integration.test.ts @@ -49,7 +49,6 @@ function makeSnapshot(opts: { messages?: UIMessage[]; lastOutEventId?: string } }, ], lastOutEventId: opts.lastOutEventId ?? "evt-42", - lastOutTimestamp: 1_700_000_000_500, }; } diff --git a/apps/webapp/test/replay-after-crash.test.ts b/apps/webapp/test/replay-after-crash.test.ts index f5c6842b194..576ced2ab2a 100644 --- a/apps/webapp/test/replay-after-crash.test.ts +++ b/apps/webapp/test/replay-after-crash.test.ts @@ -267,7 +267,6 @@ describe("replay after crash (MinIO + SDK helpers)", () => { { id: "a-1", role: "assistant", parts: [{ type: "text", text: "stale-assistant" }] }, ], lastOutEventId: "evt-prev", - lastOutTimestamp: 1_700_000_000_500, }; // Use the SDK's own writer to lay the snapshot down, then swap diff --git a/packages/trigger-sdk/test/chat-snapshot.test.ts b/packages/trigger-sdk/test/chat-snapshot.test.ts index e7421cdbd9a..cac3364639b 100644 --- a/packages/trigger-sdk/test/chat-snapshot.test.ts +++ b/packages/trigger-sdk/test/chat-snapshot.test.ts @@ -29,7 +29,6 @@ function buildSnapshot(count = 1): ChatSnapshotV1 { parts: [{ type: "text" as const, text: `hello ${i}` }], })), lastOutEventId: "evt-42", - lastOutTimestamp: 2_000_000, }; } From 0dc7142ae024affdbf67704cf434066969d7cb7b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 10:02:07 +0100 Subject: [PATCH 4/4] fix(core): keep bufferSeqNums in lock-step on session-stream on()-drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `on()`'s buffer-drain block called `this.buffer.delete(key)` after handing buffered records to the new handler but didn't delete the parallel `this.bufferSeqNums` entry. The next `#dispatch` that buffered a record would then append onto the orphaned seqNum array, and a subsequent `once()` / `shiftBuffer()` would advance `lastDispatchedSeqNum` with a stale seq — which would land on the next `turn-complete` control record's `session-in-event-id` header and cause the next worker boot to under-skip `.in`. Matches the cleanup pattern already used in shiftBuffer / disconnectStream / reset. --- packages/core/src/v3/sessionStreams/manager.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index b6cd6a740de..e9cec675ea0 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -113,6 +113,10 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.#invokeHandler(handler, data); } this.buffer.delete(key); + // Keep `bufferSeqNums` in lock-step with `buffer` — without this, + // the parallel array desyncs and the next `#dispatch` that buffers + // a record would shift a stale seqNum into `lastDispatchedSeqNum`. + this.bufferSeqNums.delete(key); } return {