diff --git a/apps/webapp/app/components/runs/v3/agent/AgentView.tsx b/apps/webapp/app/components/runs/v3/agent/AgentView.tsx index eee7646d03..cbe5cec8f3 100644 --- a/apps/webapp/app/components/runs/v3/agent/AgentView.tsx +++ b/apps/webapp/app/components/runs/v3/agent/AgentView.tsx @@ -1,5 +1,5 @@ import type { UIMessage } from "@ai-sdk/react"; -import { SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3"; import { useEffect, useMemo, useRef, useState } from "react"; import { Paragraph } from "~/components/primitives/Paragraph"; import { Spinner } from "~/components/primitives/Spinner"; @@ -27,6 +27,15 @@ export type AgentViewAuth = { * channel and is merged in by the AgentView subscription. */ initialMessages: UIMessage[]; + /** + * Presigned GET URL for the session's chat-snapshot S3 blob (written + * by the agent after each turn-complete; see `ChatSnapshotV1`). + * Optional — sessions that registered a `hydrateMessages` hook skip + * snapshot writes and the URL fetch will 404. In that case the + * dashboard falls back to seq=0 SSE (which, post-trim, shows only the + * most recent turn). Generated server-side by `SessionPresenter`. + */ + snapshotPresignedUrl?: string; }; /** @@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { projectSlug: project.slug, envSlug: environment.slug, initialMessages: agentView.initialMessages, + snapshotPresignedUrl: agentView.snapshotPresignedUrl, }); // Sticky-bottom auto-scroll: walks up to find the inspector's scroll @@ -120,6 +130,11 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { * - `kind: "stop"` is a stop signal — no messages, nothing to render * here, so it's filtered. * + * Wire payloads are slim-wire (one new UIMessage per record, on + * `payload.message`). The legacy `payload.messages` array shape is kept + * here as a fallback so any historical records on a long-lived session + * still render. + * * The server wraps records in `{data, id}` and writes `data` as a JSON * string; SSE v2 delivers the parsed string back. {@link parseChunkPayload} * re-parses to recover the object. @@ -127,6 +142,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) { type InputStreamChunk = { kind?: "message" | "stop"; payload?: { + message?: { id?: string; role?: string; parts?: unknown[] }; messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>; trigger?: string; }; @@ -217,6 +233,7 @@ function useAgentSessionMessages({ projectSlug, envSlug, initialMessages, + snapshotPresignedUrl, }: { sessionId: string; apiOrigin: string; @@ -224,6 +241,7 @@ function useAgentSessionMessages({ projectSlug: string; envSlug: string; initialMessages: UIMessage[]; + snapshotPresignedUrl?: string; }): UIMessage[] { // Seed with the user messages from the run's task payload. const seedMessages = useMemo( @@ -285,6 +303,56 @@ function useAgentSessionMessages({ const outputUrl = `${sessionBase}/out`; const inputUrl = `${sessionBase}/in`; + /** + * Try to seed `pendingRef` from the agent's S3 snapshot blob and return + * the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes + * just past the snapshot. Returns undefined for sessions that don't + * have a snapshot (e.g. `hydrateMessages` customers, or sessions that + * have never completed a turn). + */ + const loadSnapshot = async (): Promise => { + if (!snapshotPresignedUrl) return undefined; + try { + const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal }); + if (!resp.ok) return undefined; + const json = (await resp.json()) as unknown; + const parsed = ChatSnapshotV1Schema.safeParse(json); + if (!parsed.success) return undefined; + const snapshot = parsed.data; + // Preserve the snapshot's array order in the final render by + // giving each message a unique, monotonically increasing + // timestamp from `(savedAt - count + index)`. Real chunk + // timestamps from the SSE path use S2 arrival ms (positive + // numbers in the present), so anything below `savedAt` sorts + // before live chunks while preserving snapshot order among + // themselves. + const count = snapshot.messages.length; + snapshot.messages.forEach((raw, i) => { + const message = raw as UIMessage; + if (!message?.id) return; + // The snapshot's seed wins over the task-payload seed for any + // overlapping ids (the snapshot represents the agent's + // canonical accumulator, post-turn). + pendingRef.current.set(message.id, message); + if (!timestampsRef.current.has(message.id)) { + timestampsRef.current.set(message.id, snapshot.savedAt - count + i); + } + }); + scheduleFlush.current(); + return snapshot.lastOutEventId; + } catch { + // 404 / network / parse / abort — fall back to seq=0 SSE + return undefined; + } + }; + + const outputSubOptions = (lastEventId: string | undefined) => + ({ + signal: abort.signal, + timeoutInSeconds: 120, + ...(lastEventId !== undefined ? { lastEventId } : {}), + }) as const; + const commonSubOptions = { signal: abort.signal, timeoutInSeconds: 120, @@ -292,20 +360,35 @@ function useAgentSessionMessages({ // ---- Output stream: assistant messages --------------------------------- // - // The output stream delivers UIMessageChunks interleaved with - // Trigger-specific control chunks (`trigger:turn-complete`, etc.). We - // filter the control chunks and fold everything else into an assistant - // `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's - // `readUIMessageStream` helper is only available in `ai@6`, and the - // webapp is pinned to `ai@4`, so we re-implement just the chunk types - // that `renderPart` actually displays. + // The output stream delivers data records (UIMessageChunks) interleaved + // with Trigger control records (`turn-complete`, `upgrade-required`) and + // S2 command records (`trim`). Control + command records ride on + // `record.headers` with empty bodies; the SSE parser strips S2 command + // records entirely, and control records arrive with `value.chunk === + // undefined`, which `parseChunkPayload` drops below. + // + // We fold everything else into an assistant `UIMessage` via our own + // `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream` + // helper is only available in `ai@6`, and the webapp is pinned to + // `ai@4`, so we re-implement just the chunk types that `renderPart` + // actually displays. // // We capture the **server timestamp of each assistant message's first // `start` chunk** so later sort-by-timestamp merges with the input // stream correctly. const runOutput = async () => { try { - const sub = new SSEStreamSubscription(outputUrl, commonSubOptions); + // Seed messages from the snapshot first (if available), then + // resume the SSE from the snapshot's last event id so we don't + // re-stream chunks already represented in the snapshot. If no + // snapshot exists (no URL, 404, parse failure), the SSE opens + // at seq=0 — which, post-trim, contains roughly one turn of + // records (acceptable fallback for `hydrateMessages` sessions + // and fresh sessions). + const snapshotLastEventId = await loadSnapshot(); + if (abort.signal.aborted) return; + + const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId)); const raw = await sub.subscribe(); const reader = raw.getReader(); @@ -318,6 +401,12 @@ function useAgentSessionMessages({ const chunk = parseChunkPayload(value.chunk) as OutputChunk | null; if (!chunk || typeof chunk.type !== "string") continue; + // Legacy belt-and-suspenders: prior versions of the SDK + // emitted `trigger:turn-complete` / `trigger:upgrade-required` + // as data records (`type` field). Current versions use + // header-form control records, which `parseChunkPayload` + // drops above. Keep this filter to handle any in-flight + // sessions whose `.out` was populated by the older SDK. if (chunk.type.startsWith("trigger:")) continue; if (chunk.type === "start") { @@ -413,9 +502,18 @@ function useAgentSessionMessages({ const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null; if (!chunk || chunk.kind !== "message") continue; const payload = chunk.payload; - if (!payload || !Array.isArray(payload.messages)) continue; - - const incomingUsers = payload.messages.filter( + if (!payload) continue; + + // Slim-wire is one UIMessage on `payload.message`; legacy + // payloads carried an array on `payload.messages`. Accept + // either so historical records on a long-lived session still + // render. + const candidates = Array.isArray(payload.messages) + ? payload.messages + : payload.message + ? [payload.message] + : []; + const incomingUsers = candidates.filter( (m): m is UIMessage => m != null && (m as { role?: string }).role === "user" && typeof m.id === "string" ); @@ -454,7 +552,7 @@ function useAgentSessionMessages({ pendingTimerRef.current = null; } }; - }, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]); + }, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]); return useMemo(() => { const timestamps = timestampsRef.current; diff --git a/apps/webapp/app/presenters/v3/SessionPresenter.server.ts b/apps/webapp/app/presenters/v3/SessionPresenter.server.ts index 27807971d5..4d75abb85b 100644 --- a/apps/webapp/app/presenters/v3/SessionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SessionPresenter.server.ts @@ -1,8 +1,11 @@ import { type Span } from "@opentelemetry/api"; +import { chatSnapshotKeySuffix } from "@trigger.dev/core/v3"; import { type PrismaClientOrTransaction } from "@trigger.dev/database"; import { env } from "~/env.server"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; +import { logger } from "~/services/logger.server"; +import { generatePresignedUrl } from "~/v3/objectStore.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { startActiveSpan } from "~/v3/tracer.server"; @@ -15,6 +18,8 @@ export class SessionPresenter { userId: string; environmentId: string; sessionParam: string; + projectExternalRef: string; + environmentSlug: string; }) { return startActiveSpan( "SessionPresenter.call", @@ -33,10 +38,14 @@ export class SessionPresenter { userId, environmentId, sessionParam, + projectExternalRef, + environmentSlug, }: { userId: string; environmentId: string; sessionParam: string; + projectExternalRef: string; + environmentSlug: string; }, rootSpan: Span ) { @@ -112,6 +121,48 @@ export class SessionPresenter { // unused — kept here to match the existing `AgentViewAuth` shape. const addressingKey = session.externalId ?? session.friendlyId; + // Presign a GET URL for the agent's S3 snapshot blob. The browser + // fetches it directly, parses + validates, and seeds the + // TriggerChatTransport with the full history + lastEventId before + // opening the SSE. Presign succeeds regardless of whether the blob + // exists; the frontend handles 404 gracefully. + // + // Snapshots are only written when no `hydrateMessages` hook is + // registered — sessions that use `hydrateMessages` will 404 here + // and the dashboard falls back to seq=0 SSE (which, post-trim, + // shows only the most recent turn — accepted, those customers + // have their own DB-backed dashboards). + // The agent writes snapshots keyed on the session's friendlyId (the + // `session_*` form), which matches what the SDK's `chat.agent` payload + // carries as `sessionId`. Use the same key shape here so the dashboard + // hits the same S3 object. + let snapshotPresignedUrl: string | undefined; + try { + const signed = await startActiveSpan( + "SessionPresenter.presignSnapshot", + async () => + generatePresignedUrl( + projectExternalRef, + environmentSlug, + chatSnapshotKeySuffix(session.friendlyId), + "GET" + ) + ); + if (signed.success) { + snapshotPresignedUrl = signed.url; + } else { + logger.warn("SessionPresenter: snapshot presign failed", { + sessionId: session.id, + error: signed.error, + }); + } + } catch (error) { + logger.warn("SessionPresenter: snapshot presign threw", { + sessionId: session.id, + error: error instanceof Error ? error.message : String(error), + }); + } + return { id: session.id, friendlyId: session.friendlyId, @@ -147,6 +198,7 @@ export class SessionPresenter { apiOrigin: env.API_ORIGIN || env.LOGIN_ORIGIN, sessionId: addressingKey, initialMessages: [], + snapshotPresignedUrl, }, }; } diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx index 496a5fb629..c873dd9f40 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx @@ -79,6 +79,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { userId, environmentId: environment.id, sessionParam, + projectExternalRef: project.externalRef, + environmentSlug: environment.slug, }); if (!session) { diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 4c735d21d4..0553ef77f9 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -2,6 +2,7 @@ import type { UnkeyCache } from "@internal/cache"; import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } from "./types"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { headerValue } from "@trigger.dev/core/v3"; import { randomUUID } from "node:crypto"; export type S2RealtimeStreamsOptions = { @@ -258,15 +259,37 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { if (eventType === "batch" && data) { try { const parsed = JSON.parse(data) as { - records: Array<{ body: string; seq_num: number; timestamp: number }>; + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; for (const record of parsed.records) { - const parsedBody = JSON.parse(record.body) as { data: string; id: string }; + // S2 command records (trim/fence) have a single header with + // empty name. Skip — callers want only data + Trigger control + // records. + if (record.headers?.[0]?.[0] === "") { + continue; + } + + // Data records carry a JSON envelope; Trigger control records + // have an empty body and route via headers. Tolerate non-JSON + // bodies so a control record (or a malformed data record) + // doesn't take the whole batch down with it. + let parsedBody: { data: string; id: string } | undefined; + try { + parsedBody = JSON.parse(record.body) as { data: string; id: string }; + } catch { + parsedBody = undefined; + } records.push({ - data: parsedBody.data, - id: parsedBody.id, + data: parsedBody?.data ?? "", + id: parsedBody?.id ?? "", seqNum: record.seq_num, + headers: record.headers, }); } } catch { @@ -294,13 +317,19 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { * Serve SSE from a `Session`-primitive channel addressed by * `(friendlyId, io)`. * - * For `io=out`, peek the tail record first. If it's - * `trigger:turn-complete`, the agent has finished a turn and is - * either idle-waiting on `.in` or has exited — either way, no more - * chunks will arrive without further user action. We switch the - * downstream S2 read to `wait=0` (drain whatever's left, close fast) - * and set `X-Session-Settled: true` so the client knows this SSE - * close is terminal instead of the normal 60s long-poll cycle. + * For `io=out`, peek the tail of the stream. If the most recent + * non-command record is a `turn-complete` control record (i.e. the + * agent has finished a turn and is either idle-waiting on `.in` or + * has exited), no more chunks will arrive without further user + * action. We switch the downstream S2 read to `wait=0` (drain + * whatever's left, close fast) and set `X-Session-Settled: true` so + * the client knows this SSE close is terminal instead of the normal + * 60s long-poll cycle. + * + * The actual tail is now usually an S2 `trim` command record (the + * agent appends one after every turn-complete to keep `.out` + * bounded). The peek reads two records and walks past the trim to + * find the turn-complete underneath. * * Mid-turn tail (streaming UIMessageChunk) falls through to the * long-poll path; a crashed-mid-turn stream is indistinguishable @@ -324,13 +353,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { // races the newly-triggered turn's first chunk and the SSE closes // before records land. if (io === "out" && options?.peekSettled) { - const lastChunk = await this.#peekLastChunkBody(s2Stream); - const lastChunkType = - lastChunk != null && typeof lastChunk === "object" - ? (lastChunk as { type?: unknown }).type - : null; - if (lastChunkType === "trigger:turn-complete") { - settled = true; + settled = await this.#peekIsSettled(s2Stream); + if (settled) { waitSeconds = 0; } } @@ -351,13 +375,21 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { }); } - async #peekLastChunkBody(s2Stream: string): Promise { + /** + * Peek the tail of `.out` and return whether the stream is "settled" — + * i.e. the most recent non-command record is a `turn-complete` control + * record. The agent appends an S2 `trim` command record immediately + * after every turn-complete to keep the stream bounded, so we read two + * tail records and walk past any trim command to find the + * turn-complete underneath. + */ + async #peekIsSettled(s2Stream: string): Promise { const qs = new URLSearchParams(); - // `tail_offset=1` reads one record before the next seq — i.e. the - // most recently appended record. `count=1` caps it to just that - // record. `wait=0` returns immediately with no long-poll. - qs.set("tail_offset", "1"); - qs.set("count", "1"); + // `tail_offset=2` rewinds two seq positions; `count=2` caps it to + // those two records. At steady state these are `[turn-complete, trim]`. + // `wait=0` returns immediately with no long-poll. + qs.set("tail_offset", "2"); + qs.set("count", "2"); qs.set("wait", "0"); let res: Response; @@ -376,13 +408,13 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { ); } catch (err) { this.logger.warn("S2 peek last record: fetch failed", { err, stream: s2Stream }); - return null; + return false; } if (!res.ok) { // 404: stream has never been written to. 416: range not // satisfiable (empty stream). Both mean "nothing to peek." - if (res.status === 404 || res.status === 416) return null; + if (res.status === 404 || res.status === 416) return false; const text = await res.text().catch(() => ""); this.logger.warn("S2 peek last record failed", { status: res.status, @@ -390,32 +422,43 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { text, stream: s2Stream, }); - return null; + return false; } + let records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; try { const json = (await res.json()) as { - records?: Array<{ body: string; seq_num: number; timestamp: number }>; + records?: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; - const record = json.records?.[0]; - if (!record) return null; - // The record body is a JSON string `{data: , id: partId}`. - // The agent-side writer (`StreamsWriterV2`) hands `appendPart` an - // already-JSON-stringified chunk, so `data` round-trips as a string, - // not an object. Parse it once more to surface the chunk shape. - const envelope = JSON.parse(record.body) as { data: unknown; id: string }; - if (typeof envelope.data === "string") { - try { - return JSON.parse(envelope.data); - } catch { - return envelope.data; - } - } - return envelope.data; + records = json.records ?? []; } catch (err) { this.logger.warn("S2 peek last record: parse failed", { err, stream: s2Stream }); - return null; + return false; + } + + // Walk from most-recent backward, skipping S2 command records + // (`headers[0][0] === ""`). The first non-command record is the + // real tail — settled iff its `trigger-control` header is + // `turn-complete`. + for (let i = records.length - 1; i >= 0; i--) { + const record = records[i]!; + if (record.headers?.[0]?.[0] === "") { + continue; + } + const controlValue = headerValue(record.headers, "trigger-control"); + return controlValue === "turn-complete"; } + return false; } async #streamResponseByName( @@ -548,7 +591,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { basins: { exact: this.basin, }, - ops: ["append", "create-stream"], + // S2 treats `trim` as a separate op from `append` even though + // trim records are appended like any other record. Verified + // empirically: without `"trim"` here, `AppendRecord.trim()` + // writes 403 with "Operation not permitted". `chat.agent`'s + // per-turn trim chain depends on this. + ops: ["append", "create-stream", "trim"], streams: { prefix: this.streamPrefix, }, diff --git a/apps/webapp/app/services/realtime/types.ts b/apps/webapp/app/services/realtime/types.ts index 7161f158a4..f09507997a 100644 --- a/apps/webapp/app/services/realtime/types.ts +++ b/apps/webapp/app/services/realtime/types.ts @@ -2,6 +2,13 @@ export type StreamRecord = { data: string; id: string; seqNum: number; + /** + * S2 record headers, when the underlying backend is the v2 (S2) shape. + * Undefined or empty for run-scoped Redis streams. First-header empty-name + * is an S2 command record (trim/fence); the parser strips those before + * surfacing the record, so callers never see them. + */ + headers?: Array<[string, string]>; }; // Interface for stream ingestion @@ -36,8 +43,10 @@ export type StreamResponseOptions = { /** * Session-stream-only. When `true`, the responder MAY peek the tail * of `.out` and short-circuit to `wait=0` + `X-Session-Settled: true` - * if the last chunk is a terminal marker (e.g. `trigger:turn-complete`). - * Used by `TriggerChatTransport.reconnectToStream` on page reload. + * if the last record is a terminal marker (a `trigger-control` + * `turn-complete` control record, ignoring any trailing S2 trim + * command record). Used by `TriggerChatTransport.reconnectToStream` + * on page reload. * * When absent/false, the responder keeps the unconditional long-poll * behavior — required on the active send-a-message path where the diff --git a/apps/webapp/test/chat-snapshot-integration.test.ts b/apps/webapp/test/chat-snapshot-integration.test.ts index 3d157d58f9..1d500e16b9 100644 --- a/apps/webapp/test/chat-snapshot-integration.test.ts +++ b/apps/webapp/test/chat-snapshot-integration.test.ts @@ -49,7 +49,6 @@ function makeSnapshot(opts: { messages?: UIMessage[]; lastOutEventId?: string } }, ], lastOutEventId: opts.lastOutEventId ?? "evt-42", - lastOutTimestamp: 1_700_000_000_500, }; } diff --git a/apps/webapp/test/replay-after-crash.test.ts b/apps/webapp/test/replay-after-crash.test.ts index f5c6842b19..576ced2ab2 100644 --- a/apps/webapp/test/replay-after-crash.test.ts +++ b/apps/webapp/test/replay-after-crash.test.ts @@ -267,7 +267,6 @@ describe("replay after crash (MinIO + SDK helpers)", () => { { id: "a-1", role: "assistant", parts: [{ type: "text", text: "stale-assistant" }] }, ], lastOutEventId: "evt-prev", - lastOutTimestamp: 1_700_000_000_500, }; // Use the SDK's own writer to lay the snapshot down, then swap diff --git a/packages/cli-v3/src/mcp/tools/agentChat.ts b/packages/cli-v3/src/mcp/tools/agentChat.ts index 27965c06b3..078df1c46b 100644 --- a/packages/cli-v3/src/mcp/tools/agentChat.ts +++ b/packages/cli-v3/src/mcp/tools/agentChat.ts @@ -1,5 +1,10 @@ import { z } from "zod"; -import { ApiClient, SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { + ApiClient, + controlSubtype, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import { toolsMetadata } from "../config.js"; import { CommonProjectsInput } from "../schemas.js"; import { respondWithError, toolHandler } from "../utils.js"; @@ -390,50 +395,54 @@ async function collectAgentResponse( session.lastEventId = value.id; } + // Trigger control records (turn-complete, upgrade-required) ride + // on headers — see `client-protocol.mdx#records-on-session-out`. + // Data records carry UIMessageChunks on `value.chunk`. + const controlValue = controlSubtype(value.headers); + + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + break; + } + + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { + // Agent requested upgrade — trigger continuation. Same session, + // new run — reuse sessionId, swap runId. Slim-wire: ship only + // the latest user message as the turn-N delta; prior turns + // come back via snapshot+replay on the new run's boot. + const lastUserMessage = [...session.messages] + .reverse() + .find((m) => m.role === "user"); + const previousRunId = session.runId; + const result = await session.apiClient.triggerTask(session.agentId, { + payload: { + message: lastUserMessage, + chatId: session.chatId, + sessionId: session.sessionId, + trigger: "submit-message", + metadata: session.clientData, + continuation: true, + previousRunId, + }, + options: { + payloadType: "application/json", + tags: [`chat:${session.chatId}`], + }, + }); + session.runId = result.id; + // Keep session.lastEventId pointing at the upgrade-required + // record's seq (set above when the part arrived). The recursive + // subscribe resumes right after that marker, so we don't replay + // the entire session.out stream — which would hit a historical + // turn-complete and break the loop with empty/old text. The outer + // `finally` block releases the reader before the recursion runs. + return collectAgentResponse(session, depth + 1); + } + // v2 (session) SSE already parses record.body.data, so `chunk` is // the UIMessageChunk object written by the agent. if (value.chunk != null && typeof value.chunk === "object") { const chunk = value.chunk as Record; - if (chunk.type === "trigger:turn-complete") { - break; - } - - if (chunk.type === "trigger:upgrade-required") { - // Agent requested upgrade — trigger continuation. Same session, - // new run — reuse sessionId, swap runId. Slim-wire: ship only - // the latest user message as the turn-N delta; prior turns - // come back via snapshot+replay on the new run's boot. - const lastUserMessage = [...session.messages] - .reverse() - .find((m) => m.role === "user"); - const previousRunId = session.runId; - const result = await session.apiClient.triggerTask(session.agentId, { - payload: { - message: lastUserMessage, - chatId: session.chatId, - sessionId: session.sessionId, - trigger: "submit-message", - metadata: session.clientData, - continuation: true, - previousRunId, - }, - options: { - payloadType: "application/json", - tags: [`chat:${session.chatId}`], - }, - }); - session.runId = result.id; - // Keep session.lastEventId pointing at the trigger:upgrade-required - // chunk's id (set at line 370 when the chunk arrived). The recursive - // subscribe resumes right after that marker, so we don't replay the - // entire session.out stream — which would hit a historical - // trigger:turn-complete and break the loop with empty/old text. - reader.releaseLock(); - // Recurse — subscribe to the new run's stream (same session.out URL) - return collectAgentResponse(session, depth + 1); - } - if (chunk.type === "text-delta" && typeof chunk.delta === "string") { text += chunk.delta; // Accumulate into a text part diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index b304300f14..032e88ea66 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -117,6 +117,10 @@ import { RealtimeRunSkipColumns, type SSEStreamPart, } from "./runStream.js"; +import { + controlSubtype, + type ControlEvent, +} from "../sessionStreams/wireProtocol.js"; import { CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, @@ -1308,6 +1312,12 @@ export class ApiClient { onError?: (error: Error) => void; lastEventId?: string; onPart?: (part: SSEStreamPart) => void; + /** + * Fires when a `trigger-control` record arrives on the stream (e.g. + * `turn-complete`, `upgrade-required`). The control record is never + * enqueued into the consumer stream — handle the event here. + */ + onControl?: (event: ControlEvent) => void; } ): Promise> { const url = `${options?.baseUrl ?? this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}`; @@ -1323,13 +1333,29 @@ export class ApiClient { const stream = await subscription.subscribe(); const onPart = options?.onPart; + const onControl = options?.onControl; return stream.pipeThrough( new TransformStream({ - transform(chunk, controller) { - const data = chunk.chunk as T; - onPart?.(chunk as SSEStreamPart); - controller.enqueue(data); + transform(part, controller) { + // Always surface the raw part via onPart so cursor tracking + // (lastSeqNum, lastEventId) stays correct for both data and + // control records. + onPart?.(part as SSEStreamPart); + + // Trigger control record — route to onControl, never enqueue. + const subtype = controlSubtype(part.headers); + if (subtype) { + onControl?.({ + subtype, + headers: part.headers ?? [], + seqNum: Number.parseInt(part.id, 10) || 0, + timestamp: part.timestamp, + }); + return; + } + + controller.enqueue(part.chunk as T); }, }) ); diff --git a/packages/core/src/v3/apiClient/runStream.test.ts b/packages/core/src/v3/apiClient/runStream.test.ts index a91e70c6e5..4ac2880976 100644 --- a/packages/core/src/v3/apiClient/runStream.test.ts +++ b/packages/core/src/v3/apiClient/runStream.test.ts @@ -442,3 +442,157 @@ describe("SSEStreamSubscription retry behavior", () => { expect(max - min).toBeGreaterThan(2); }); }); + +describe("SSEStreamSubscription v2 batch parsing — record kinds", () => { + const originalFetch = globalThis.fetch; + + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + type ParsedPart = { id: string; chunk: unknown; headers?: ReadonlyArray }; + + // Build a v2 batch SSE response with the given records and close. + function makeBatchResponse( + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }> + ) { + const body = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode(`event: batch\ndata: ${JSON.stringify({ records })}\n\n`) + ); + controller.close(); + }, + }); + return new Response(body, { + status: 200, + headers: { "Content-Type": "text/event-stream", "X-Stream-Version": "v2" }, + }); + } + + async function drain(stream: ReadableStream) { + const reader = stream.getReader(); + const parts: ParsedPart[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) { + reader.releaseLock(); + return parts; + } + parts.push(value as ParsedPart); + } + } + + it("data records flow through with headers and parsed body", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: JSON.stringify({ data: { type: "text-delta", delta: "hi" }, id: "p1" }), + seq_num: 5, + timestamp: 1700000000000, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + expect(parts).toHaveLength(1); + expect(parts[0]!.id).toBe("5"); + expect(parts[0]!.chunk).toEqual({ type: "text-delta", delta: "hi" }); + expect(parts[0]!.headers).toEqual([]); + }); + + it("S2 command records (empty-name header) are filtered out", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: JSON.stringify({ data: { type: "text-delta", delta: "before" }, id: "p1" }), + seq_num: 4, + timestamp: 1700000000000, + headers: [], + }, + // Trim command record — empty-name header, opaque body. + { + body: "AAAAAAAAAAQ=", + seq_num: 5, + timestamp: 1700000000001, + headers: [["", "trim"]], + }, + { + body: JSON.stringify({ data: { type: "text-delta", delta: "after" }, id: "p2" }), + seq_num: 6, + timestamp: 1700000000002, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Trim record stripped — only the two data records survive. + expect(parts).toHaveLength(2); + expect((parts[0]!.chunk as any).delta).toBe("before"); + expect((parts[1]!.chunk as any).delta).toBe("after"); + }); + + it("trigger-control records flow with headers and undefined chunk", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: "", + seq_num: 7, + timestamp: 1700000000003, + headers: [ + ["trigger-control", "turn-complete"], + ["public-access-token", "eyJ..."], + ], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Control record passes through so consumers can route by header, + // but its `chunk` is undefined (empty body). + expect(parts).toHaveLength(1); + expect(parts[0]!.chunk).toBeUndefined(); + expect(parts[0]!.headers).toEqual([ + ["trigger-control", "turn-complete"], + ["public-access-token", "eyJ..."], + ]); + }); + + it("malformed data record body does not crash; cursor still advances", async () => { + globalThis.fetch = vi.fn().mockResolvedValue( + makeBatchResponse([ + { + body: "not json at all", + seq_num: 8, + timestamp: 1700000000004, + headers: [], + }, + { + body: JSON.stringify({ data: { type: "text-delta", delta: "x" }, id: "p3" }), + seq_num: 9, + timestamp: 1700000000005, + headers: [], + }, + ]) + ); + const sub = new SSEStreamSubscription("http://x", { maxRetries: 0 }); + const parts = await sub.subscribe().then(drain); + + // Malformed record still propagates with undefined chunk (matches + // control-record shape); next data record is fine. + expect(parts).toHaveLength(2); + expect(parts[0]!.chunk).toBeUndefined(); + expect((parts[1]!.chunk as any).delta).toBe("x"); + }); +}); diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 2152c6c69c..217b7a5108 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -176,6 +176,15 @@ export type SSEStreamPart = { id: string; chunk: TChunk; timestamp: number; + /** + * S2 record headers, when the underlying transport is the v2 batch shape + * (Session streams). Undefined for v1 streams. Empty array when the record + * had no headers. Trigger control records carry a `trigger-control` named + * header (see `trigger-control` records on `session.out`) and may reach + * this struct. S2 command records (trim/fence) are identified by an + * empty-name first header and are filtered out before enqueue. + */ + headers?: Array<[string, string]>; }; // Real implementation for production @@ -374,18 +383,47 @@ export class SSEStreamSubscription implements StreamSubscription { } else { if (chunk.event === "batch") { const data = safeParseJSON(chunk.data) as { - records: Array<{ body: string; seq_num: number; timestamp: number }>; + records: Array<{ + body: string; + seq_num: number; + timestamp: number; + headers?: Array<[string, string]>; + }>; }; + if (!data || !Array.isArray(data.records)) return; for (const record of data.records) { + // Always advance the resume cursor — even for records we + // skip — so a future Last-Event-ID reconnect lands past + // them. this.lastEventId = record.seq_num.toString(); - const parsedBody = safeParseJSON(record.body) as { data: unknown; id: string }; - if (seenIds.has(parsedBody.id)) continue; - seenIds.add(parsedBody.id); + + // S2 command records (trim, fence) have a single header + // with an empty name. They are S2-interpreted directives + // that consume a seq_num but are not application data. + // Skip enqueue; consumers shouldn't see them. + if (record.headers?.[0]?.[0] === "") { + continue; + } + + // Data record (and Trigger control records — see + // `trigger-control` header in `client-protocol.mdx`). + // Control records have an empty body; data records have a + // JSON envelope. `safeParseJSON("")` returns undefined, + // which is what we want for control records — downstream + // consumers route by `headers` and ignore `chunk`. + const parsedBody = safeParseJSON(record.body) as + | { data: unknown; id: string } + | undefined; + if (parsedBody?.id) { + if (seenIds.has(parsedBody.id)) continue; + seenIds.add(parsedBody.id); + } chunkController.enqueue({ id: record.seq_num.toString(), - chunk: parsedBody.data, + chunk: parsedBody?.data, timestamp: record.timestamp, + headers: record.headers ?? [], }); } } diff --git a/packages/core/src/v3/realtime-streams-api.ts b/packages/core/src/v3/realtime-streams-api.ts index e873413e2c..d9cd9ecfb4 100644 --- a/packages/core/src/v3/realtime-streams-api.ts +++ b/packages/core/src/v3/realtime-streams-api.ts @@ -7,3 +7,9 @@ export const realtimeStreams = RealtimeStreamsAPI.getInstance(); export * from "./realtimeStreams/types.js"; export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js"; export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js"; +export { + trimSessionStream, + writeSessionControlRecord, + writeTurnCompleteRecord, + writeUpgradeRequiredRecord, +} from "./realtimeStreams/sessionStreamOneshot.js"; diff --git a/packages/core/src/v3/realtimeStreams/index.ts b/packages/core/src/v3/realtimeStreams/index.ts index 80c44f5a3d..b1c2073580 100644 --- a/packages/core/src/v3/realtimeStreams/index.ts +++ b/packages/core/src/v3/realtimeStreams/index.ts @@ -11,6 +11,12 @@ import { // into the core package's internals. export { SessionStreamInstance } from "./sessionStreamInstance.js"; export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js"; +export { + trimSessionStream, + writeSessionControlRecord, + writeTurnCompleteRecord, + writeUpgradeRequiredRecord, +} from "./sessionStreamOneshot.js"; const API_NAME = "realtime-streams"; diff --git a/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts b/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts new file mode 100644 index 0000000000..9aa25fa82d --- /dev/null +++ b/packages/core/src/v3/realtimeStreams/sessionStreamOneshot.ts @@ -0,0 +1,136 @@ +import { AppendInput, AppendRecord, S2 } from "@s2-dev/streamstore"; +import type { ApiClient } from "../apiClient/index.js"; +import { + TRIGGER_CONTROL_HEADER, + TRIGGER_CONTROL_SUBTYPE, + type TriggerControlSubtype, +} from "../sessionStreams/wireProtocol.js"; +import type { StreamWriteResult } from "./types.js"; + +/** + * One-shot S2 writes against a Session channel. Used for Trigger control + * records (turn-complete, upgrade-required) and S2 command records (trim). + * + * These differ from the streaming writer (`SessionStreamInstance` / + * `StreamsWriterV2`) in two ways: they emit a single record per call, and + * they need precise control over the record's `headers` + `body` shape — + * which the streaming writer's JSON-envelope encoding doesn't expose. + * + * Each call fetches a fresh S2 access token via `initializeSessionStream` + * and opens a new client. Cheap enough at the rate these are emitted + * (~one of each per turn). + */ + +type IO = "out" | "in"; + +async function getS2Stream(apiClient: ApiClient, sessionId: string, io: IO) { + const response = await apiClient.initializeSessionStream(sessionId, io); + const headers = response.headers ?? {}; + const accessToken = headers["x-s2-access-token"]; + const basin = headers["x-s2-basin"]; + const streamName = headers["x-s2-stream-name"]; + const endpoint = headers["x-s2-endpoint"]; + + if (!accessToken || !basin || !streamName) { + throw new Error( + "Session stream initialize did not return S2 credentials — server may be configured for v1 realtime streams, which sessions do not support." + ); + } + + const s2 = new S2({ + accessToken, + ...(endpoint + ? { + endpoints: { + account: endpoint, + basin: endpoint, + }, + } + : {}), + }); + + return s2.basin(basin).stream(streamName); +} + +/** + * Append a single Trigger control record to a Session channel. The record + * carries a `trigger-control` header valued with `subtype`, plus any + * sibling headers (e.g. `public-access-token` on `turn-complete`). Body is + * always empty — control semantics live in the headers. + * + * Returns the ack's last seq_num as `lastEventId`, useful for trim chains. + */ +export async function writeSessionControlRecord( + apiClient: ApiClient, + sessionId: string, + io: IO, + subtype: TriggerControlSubtype | string, + extraHeaders?: ReadonlyArray +): Promise { + const stream = await getS2Stream(apiClient, sessionId, io); + const headers: ReadonlyArray = [ + [TRIGGER_CONTROL_HEADER, subtype], + ...(extraHeaders ?? []), + ]; + const record = AppendRecord.string({ body: "", headers }); + const ack = await stream.append(AppendInput.create([record])); + // S2's `AppendAck.start` is the seq_num of the FIRST record in the batch + // (inclusive); `end` is the seq AFTER the last record (exclusive, equal + // to `tail`). For a single-record append they differ by one — `start` is + // the seq we just wrote, `end` is the next vacant seq. Return `start` + // here so the caller can chain trims against the actual record seq. + return { lastEventId: ack.start.seqNum.toString() }; +} + +/** + * Append an S2 `trim` command record to `session.out`, setting the new + * earliest-readable seq_num. Idempotent and monotonic at S2 — the + * effective trim point is `max(existing, min(provided, current_tail))`. + * + * Used after every `turn-complete` to keep `session.out` bounded to + * approximately one turn of records at steady state. + */ +export async function trimSessionStream( + apiClient: ApiClient, + sessionId: string, + earliestSeqNum: number +): Promise { + const stream = await getS2Stream(apiClient, sessionId, "out"); + await stream.append(AppendInput.create([AppendRecord.trim(earliestSeqNum)])); +} + +/** + * Convenience: append a `turn-complete` control record. Carries an + * optional refreshed `publicAccessToken` in a sibling header. + */ +export async function writeTurnCompleteRecord( + apiClient: ApiClient, + sessionId: string, + publicAccessToken?: string +): Promise { + const extra: ReadonlyArray = publicAccessToken + ? [["public-access-token", publicAccessToken]] + : []; + return writeSessionControlRecord( + apiClient, + sessionId, + "out", + TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE, + extra + ); +} + +/** + * Convenience: append an `upgrade-required` control record. + */ +export async function writeUpgradeRequiredRecord( + apiClient: ApiClient, + sessionId: string +): Promise { + return writeSessionControlRecord( + apiClient, + sessionId, + "out", + TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED + ); +} diff --git a/packages/core/src/v3/session-streams-api.ts b/packages/core/src/v3/session-streams-api.ts index afa417a641..4f5c979aa3 100644 --- a/packages/core/src/v3/session-streams-api.ts +++ b/packages/core/src/v3/session-streams-api.ts @@ -5,3 +5,5 @@ import { SessionStreamsAPI } from "./sessionStreams/index.js"; export const sessionStreams = SessionStreamsAPI.getInstance(); export * from "./sessionStreams/types.js"; +export * from "./sessionStreams/wireProtocol.js"; +export * from "./sessionStreams/chatSnapshot.js"; diff --git a/packages/core/src/v3/sessionStreams/chatSnapshot.ts b/packages/core/src/v3/sessionStreams/chatSnapshot.ts new file mode 100644 index 0000000000..f30ea47b46 --- /dev/null +++ b/packages/core/src/v3/sessionStreams/chatSnapshot.ts @@ -0,0 +1,50 @@ +/** + * Persisted chat-snapshot blob. Written by `chat.agent` to S3 after every + * turn completes (when no `hydrateMessages` hook is registered) and read + * back at the start of the next run to seed the accumulator. Also read by + * the Sessions dashboard to render the full conversation transcript + * without re-streaming `session.out` from `seq_num=0`. + * + * S3 key suffix: `sessions/{sessionId}/snapshot.json`. The webapp's + * presigned-URL service prefixes this with `packets/{projectRef}/{envSlug}/`. + * + * `lastOutEventId` is the S2 seq_num (as a string) of the snapshot's + * final `turn-complete` control record. Used to resume `session.out` + * replay from precisely after the snapshot, and as the trim-chain seed + * for the agent's next turn. + * + * The `version` field is a forward-compat lever: readers that don't + * recognise a version silently fall back to no-snapshot behaviour. + */ + +import { z } from "zod"; + +import type { UIMessage } from "ai"; + +export type ChatSnapshotV1 = { + version: 1; + savedAt: number; + messages: TUIMessage[]; + lastOutEventId?: string; +}; + +/** + * Zod schema for `ChatSnapshotV1` with the message shape kept opaque + * (`unknown[]`). The agent runtime types messages strictly via the + * generic parameter; readers that need stricter validation can layer + * their own UIMessage parser on top. + */ +export const ChatSnapshotV1Schema = z.object({ + version: z.literal(1), + savedAt: z.number(), + messages: z.array(z.unknown()), + lastOutEventId: z.string().optional(), +}); + +/** + * S3 key suffix for a session's snapshot blob. The webapp's presigned + * URL routes prefix this with `packets/{projectRef}/{envSlug}/`. + */ +export function chatSnapshotKeySuffix(sessionId: string): string { + return `sessions/${sessionId}/snapshot.json`; +} diff --git a/packages/core/src/v3/sessionStreams/index.ts b/packages/core/src/v3/sessionStreams/index.ts index 75b372c831..7ea1726163 100644 --- a/packages/core/src/v3/sessionStreams/index.ts +++ b/packages/core/src/v3/sessionStreams/index.ts @@ -59,6 +59,18 @@ export class SessionStreamsAPI implements SessionStreamManager { this.#getManager().setLastSeqNum(sessionId, io, seqNum); } + public lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined { + return this.#getManager().lastDispatchedSeqNum(sessionId, io); + } + + public setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void { + this.#getManager().setLastDispatchedSeqNum(sessionId, io, seqNum); + } + public setMinTimestamp( sessionId: string, io: SessionChannelIO, diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index 0463cb3fb7..e9cec675ea 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -7,6 +7,7 @@ import { import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js"; import { SessionChannelIO, SessionStreamManager } from "./types.js"; +import { controlSubtype } from "./wireProtocol.js"; type SessionStreamHandler = (data: unknown) => void | Promise; @@ -43,6 +44,12 @@ export class StandardSessionStreamManager implements SessionStreamManager { private handlers = new Map>(); private onceWaiters = new Map(); private buffer = new Map(); + // Parallel to `buffer`: the SSE seq_num of each buffered record. Same + // length and order as `buffer[key]`. Used so that when `once()` shifts + // a buffered record into a waiter, the cursor (`lastDispatchedSeqNums`) + // can advance to that record's seq. Kept as a separate map so the + // existing `peek()` shape (returns `unknown`) stays unchanged. + private bufferSeqNums = new Map(); private tails = new Map(); // Per-stream lower-bound timestamp filter. When set, records whose // SSE timestamp is <= the bound are dropped before dispatch — used by @@ -58,6 +65,15 @@ export class StandardSessionStreamManager implements SessionStreamManager { // that's already being delivered out-of-band via the waitpoint. private explicitlyDisconnected = new Set(); private seqNums = new Map(); + // Highest seq_num that has been *consumed* (delivered to a once() + // waiter or shifted off the buffer into a once() caller) on a channel. + // Distinct from `seqNums`, which advances whenever any record is + // received from SSE — even ones still sitting in the local buffer. + // The committed-consume cursor is what gets persisted on the + // turn-complete control record's `session-in-event-id` header so the + // next worker boot can resume `.in` from this point without + // re-delivering already-handled user messages. + private lastDispatchedSeqNums = new Map(); // Reconnect attempt counter per key. Drives the exponential backoff // applied by `#ensureTailConnected`'s `.finally` so a persistent // backend failure (auth rejection, 5xx, DNS, etc.) doesn't reconnect @@ -97,6 +113,10 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.#invokeHandler(handler, data); } this.buffer.delete(key); + // Keep `bufferSeqNums` in lock-step with `buffer` — without this, + // the parallel array desyncs and the next `#dispatch` that buffers + // a record would shift a stale seqNum into `lastDispatchedSeqNum`. + this.bufferSeqNums.delete(key); } return { @@ -122,8 +142,14 @@ export class StandardSessionStreamManager implements SessionStreamManager { const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { const data = buffered.shift()!; + const seqList = this.bufferSeqNums.get(key); + const shiftedSeqNum = seqList?.shift(); if (buffered.length === 0) { this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } + if (shiftedSeqNum !== undefined) { + this.#advanceLastDispatched(key, shiftedSeqNum); } return new InputStreamOncePromise((resolve) => { resolve({ ok: true, output: data }); @@ -185,6 +211,25 @@ export class StandardSessionStreamManager implements SessionStreamManager { } } + lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined { + return this.lastDispatchedSeqNums.get(keyFor(sessionId, io)); + } + + setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void { + this.#advanceLastDispatched(keyFor(sessionId, io), seqNum); + } + + #advanceLastDispatched(key: string, seqNum: number): void { + const current = this.lastDispatchedSeqNums.get(key); + if (current === undefined || seqNum > current) { + this.lastDispatchedSeqNums.set(key, seqNum); + } + } + setMinTimestamp( sessionId: string, io: SessionChannelIO, @@ -203,7 +248,15 @@ export class StandardSessionStreamManager implements SessionStreamManager { const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { buffered.shift(); - if (buffered.length === 0) this.buffer.delete(key); + const seqList = this.bufferSeqNums.get(key); + const shiftedSeqNum = seqList?.shift(); + if (buffered.length === 0) { + this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } + if (shiftedSeqNum !== undefined) { + this.#advanceLastDispatched(key, shiftedSeqNum); + } return true; } return false; @@ -223,6 +276,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.tails.delete(key); } this.buffer.delete(key); + this.bufferSeqNums.delete(key); // Reset the backoff counter so a future re-attach starts fresh — // an explicit disconnect is a deliberate teardown, not evidence of // a broken backend. @@ -260,6 +314,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { reset(): void { this.disconnect(); this.seqNums.clear(); + this.lastDispatchedSeqNums.clear(); this.minTimestamps.clear(); this.handlers.clear(); this.reconnectAttempts.clear(); @@ -275,6 +330,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { } this.onceWaiters.clear(); this.buffer.clear(); + this.bufferSeqNums.clear(); } #ensureTailConnected(sessionId: string, io: SessionChannelIO): void { @@ -361,6 +417,13 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.seqNums.set(key, seqNum); } + // Trigger control records (turn-complete, upgrade-required) + // are dispatched out-of-band via `onControl` — they're not + // consumer-facing data. Skip the data dispatch path. + if (controlSubtype(part.headers)) { + return; + } + // Min-timestamp filter: drop records older than (or at) the // bound. Used to skip already-processed records on OOM-retry // boot. @@ -377,7 +440,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { // keep as string } } - this.#dispatch(key, data); + this.#dispatch(key, data, Number.isFinite(seqNum) ? seqNum : undefined); }, onComplete: () => { if (this.debug) { @@ -402,7 +465,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { } } - #dispatch(key: string, data: unknown): void { + #dispatch(key: string, data: unknown, seqNum: number | undefined): void { // Any record flowing through = healthy connection; reset the backoff // counter so the next disconnect starts fresh. this.reconnectAttempts.delete(key); @@ -415,6 +478,12 @@ export class StandardSessionStreamManager implements SessionStreamManager { if (waiter.signal && waiter.abortHandler) { waiter.signal.removeEventListener("abort", waiter.abortHandler); } + // Record was consumed directly by a waiter — advance the + // committed-consume cursor immediately. Buffered-then-shifted + // records advance the cursor in `once()` / `shiftBuffer()`. + if (seqNum !== undefined) { + this.#advanceLastDispatched(key, seqNum); + } waiter.resolve({ ok: true, output: data }); this.#invokeHandlers(key, data); return; @@ -434,6 +503,14 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.buffer.set(key, buffered); } buffered.push(data); + if (seqNum !== undefined) { + let bufferedSeqs = this.bufferSeqNums.get(key); + if (!bufferedSeqs) { + bufferedSeqs = []; + this.bufferSeqNums.set(key, bufferedSeqs); + } + bufferedSeqs.push(seqNum); + } } #invokeHandlers(key: string, data: unknown): void { diff --git a/packages/core/src/v3/sessionStreams/noopManager.ts b/packages/core/src/v3/sessionStreams/noopManager.ts index c1c3c38dcd..42d97c9d4e 100644 --- a/packages/core/src/v3/sessionStreams/noopManager.ts +++ b/packages/core/src/v3/sessionStreams/noopManager.ts @@ -31,6 +31,16 @@ export class NoopSessionStreamManager implements SessionStreamManager { setLastSeqNum(_sessionId: string, _io: SessionChannelIO, _seqNum: number): void {} + lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined { + return undefined; + } + + setLastDispatchedSeqNum( + _sessionId: string, + _io: SessionChannelIO, + _seqNum: number + ): void {} + setMinTimestamp( _sessionId: string, _io: SessionChannelIO, diff --git a/packages/core/src/v3/sessionStreams/types.ts b/packages/core/src/v3/sessionStreams/types.ts index 2310fabae2..f59d3ee9c7 100644 --- a/packages/core/src/v3/sessionStreams/types.ts +++ b/packages/core/src/v3/sessionStreams/types.ts @@ -45,6 +45,28 @@ export interface SessionStreamManager { /** Advance the last-seen sequence number (prevents SSE replay after `.wait` resume). */ setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void; + /** + * Highest sequence number that has been *consumed* on the channel — + * delivered to a `once()` waiter or shifted off the buffer into one. + * Distinct from {@link lastSeqNum}, which advances on every received + * record regardless of whether anything consumed it. Used by + * `chat.agent` to persist the `.in` resume cursor on each + * `turn-complete` control record so the next worker boot can resume + * the channel from this point without replaying processed messages. + */ + lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined; + + /** + * Seed the committed-consume cursor at worker boot — e.g. from the + * `session-in-event-id` header on the latest `turn-complete` on + * `.out`. Monotonic: only ever advances forward, never backwards. + */ + setLastDispatchedSeqNum( + sessionId: string, + io: SessionChannelIO, + seqNum: number + ): void; + /** * Set a per-stream lower-bound SSE timestamp. Records whose timestamp * is `<= minTimestamp` are dropped before dispatch. Used by chat.agent diff --git a/packages/core/src/v3/sessionStreams/wireProtocol.ts b/packages/core/src/v3/sessionStreams/wireProtocol.ts new file mode 100644 index 0000000000..550e81a0af --- /dev/null +++ b/packages/core/src/v3/sessionStreams/wireProtocol.ts @@ -0,0 +1,93 @@ +/** + * Wire-format constants for records on `session.out` / `session.in`. + * + * Three kinds of records can appear on a Session stream: + * + * 1. **Data records** — JSON body shaped as `{data: , id: + * }`, no special headers. The substance of the conversation. + * + * 2. **Trigger control records** — empty body, `headers` carry `[ + * ["trigger-control", ], ...]` plus any subtype-specific sibling + * headers (e.g. `public-access-token` on `turn-complete`). Routed to a + * consumer's `onControl` callback; never surfaced as data chunks. + * + * 3. **S2 command records** — opaque body, `headers` first entry has an + * empty name (only valid for S2-interpreted directives like `trim` and + * `fence`). Filtered out at the SSE parser; consumers never see them. + * + * See `docs/ai-chat/client-protocol.mdx#records-on-session-out` for the + * customer-facing contract. + */ + +/** Header name carrying the Trigger control subtype on control records. */ +export const TRIGGER_CONTROL_HEADER = "trigger-control" as const; + +/** Header name carrying the refreshed `publicAccessToken` on `turn-complete`. */ +export const PUBLIC_ACCESS_TOKEN_HEADER = "public-access-token" as const; + +/** Header name carrying the agent's last S2 event id on a handover bridge. */ +export const SESSION_STATE_LAST_EVENT_ID_HEADER = "last-event-id" as const; + +/** + * Header on `turn-complete` records carrying the highest `session.in` + * seq_num the agent committed to processing during this turn. Read on + * the next worker boot to seed `.in`'s resume cursor — anything past + * this seq is new and gets delivered; anything at-or-before was already + * processed and is skipped. Decimal-string form of the seq_num. + * + * Omitted when no `.in` records have been consumed yet (first turn of a + * fresh chat triggered via the wire payload). + */ +export const SESSION_IN_EVENT_ID_HEADER = "session-in-event-id" as const; + +export const TRIGGER_CONTROL_SUBTYPE = { + TURN_COMPLETE: "turn-complete", + UPGRADE_REQUIRED: "upgrade-required", +} as const; + +export type TriggerControlSubtype = + (typeof TRIGGER_CONTROL_SUBTYPE)[keyof typeof TRIGGER_CONTROL_SUBTYPE]; + +/** Read a single header value by name. Returns the first match. */ +export function headerValue( + headers: ReadonlyArray | undefined, + name: string +): string | undefined { + if (!headers) return undefined; + for (const entry of headers) { + if (entry?.[0] === name) return entry[1]; + } + return undefined; +} + +/** + * Return the Trigger control subtype carried by a record's headers, if any. + * Returns `undefined` for data records and S2 command records. + */ +export function controlSubtype( + headers: ReadonlyArray | undefined +): string | undefined { + return headerValue(headers, TRIGGER_CONTROL_HEADER); +} + +/** + * Is this record an S2 command record? Detected via the empty-name first + * header, which S2 permits only for command records (trim/fence). + */ +export function isS2CommandRecord( + headers: ReadonlyArray | undefined +): boolean { + return headers?.[0]?.[0] === ""; +} + +/** Event payload delivered to a Session-stream `onControl` callback. */ +export type ControlEvent = { + /** Subtype value from the `trigger-control` header (e.g. `turn-complete`). */ + subtype: string; + /** All headers on the underlying record. Read additional metadata here. */ + headers: ReadonlyArray; + /** S2 sequence number of the control record. */ + seqNum: number; + /** S2 arrival timestamp of the control record (ms since epoch). */ + timestamp: number; +}; diff --git a/packages/core/src/v3/test/test-session-stream-manager.ts b/packages/core/src/v3/test/test-session-stream-manager.ts index 493093d686..a688790d61 100644 --- a/packages/core/src/v3/test/test-session-stream-manager.ts +++ b/packages/core/src/v3/test/test-session-stream-manager.ts @@ -145,6 +145,21 @@ export class TestSessionStreamManager implements SessionStreamManager { this.seqNums.set(keyFor(sessionId, io), seqNum); } + lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined { + // The test harness drives records via `__sendFromTest` without seq + // numbers, so the committed-consume cursor stays undefined. Tests + // that need cursor behaviour exercise it via the real manager. + return undefined; + } + + setLastDispatchedSeqNum( + _sessionId: string, + _io: SessionChannelIO, + _seqNum: number + ): void { + // no-op — see comment on `lastDispatchedSeqNum`. + } + setMinTimestamp( _sessionId: string, _io: SessionChannelIO, diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 63a129ae98..19f21aaa2c 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -2,7 +2,9 @@ import { accessoryAttributes, AnyTask, apiClientManager, + controlSubtype, getSchemaParseFn, + headerValue, InputStreamOncePromise, type InputStreamOnceOptions, type InputStreamWaitOptions, @@ -31,6 +33,8 @@ import { type TaskSchema, type TaskRunContext, type TaskWithSchema, + SESSION_IN_EVENT_ID_HEADER, + TRIGGER_CONTROL_SUBTYPE, type WriterStreamOptions, } from "@trigger.dev/core/v3"; import type { @@ -42,7 +46,7 @@ import type { UIMessageStreamOptions, LanguageModelUsage, } from "ai"; -import type { StreamWriteResult } from "@trigger.dev/core/v3"; +import type { ChatSnapshotV1, StreamWriteResult } from "@trigger.dev/core/v3"; import { convertToModelMessages, dynamicTool, @@ -132,73 +136,81 @@ const chatTurnContextKey = locals.create("chat.turnContext"); const chatSessionHandleKey = locals.create("chat.sessionHandle"); /** - * Scan `session.out` for the latest `trigger:turn-complete` chunk and - * return its SSE timestamp. Used at OOM-retry boot to derive a - * lower-bound timestamp for the `session.in` filter — records older - * than `T_last_complete` belong to turns that already completed on the - * prior attempt and are dropped before they reach the turn loop. + * S2 seq_num of the most recent `turn-complete` control record written by + * this worker. Read by `writeTurnCompleteChunk` to know what to trim back + * to when the next turn finishes, keeping `session.out` bounded to ~one + * turn at steady state. * - * Implementation is a streaming scan: subscribes via the existing SSE - * endpoint with a short `timeoutInSeconds`, processes each part inline, - * and discards the chunk body so memory stays O(1) regardless of how - * many records are on `session.out`. Bandwidth scales linearly with - * stream length but the scan only fires on retry — a rare event. - * - * Returns `undefined` if no `trigger:turn-complete` chunk has been - * written yet (first-turn OOM, no completed turns to dedup against). + * Seeded at boot from `ChatSnapshotV1.lastOutEventId` (which is exactly + * the previous turn-complete's seq_num). Wrapped in a mutable holder so + * `writeTurnCompleteChunk` can advance it without going through a setter. * @internal */ -async function findLatestTurnCompleteTimestamp( +const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>( + "chat.lastTurnCompleteSeqNum" +); + +/** + * Scan `session.out` for the latest `turn-complete` control record and + * return its `session-in-event-id` header value — the committed-consume + * cursor on `.in` as of that turn-complete. Used at worker boot to seed + * the `.in` subscription so already-processed user messages don't get + * replayed from S2. + * + * Implementation streams the SSE endpoint and listens for `turn-complete` + * via the transport's `onControl` callback; the data-chunk for-await is + * just there to drive the stream. The scan is O(1 turn) because + * `session.out` is bounded to roughly one turn at steady state — every + * successful turn-complete is followed by an S2 trim back to the + * previous one (see `writeTurnCompleteChunk`). + * + * Returns `undefined` if no `turn-complete` carrying the header has been + * written yet — first-turn-ever, first turn post-OOM-with-no-prior-runs, + * or a `turn-complete` written before this header existed (cross-version + * boot). Callers fall back to subscribing `.in` from seq 0 in that case; + * the slim-wire merge handles any dedup against snapshot-restored + * messages. + * @internal + */ +async function findLatestSessionInCursor( chatId: string ): Promise { const apiClient = apiClientManager.clientOrThrow(); - let latestTs: number | undefined; + let latestCursor: number | undefined; const stream = await apiClient.subscribeToSessionStream(chatId, "out", { timeoutInSeconds: 1, - onPart: (part) => { - let chunk: unknown = part.chunk; - if (typeof chunk === "string") { - try { - chunk = JSON.parse(chunk); - } catch { - return; - } - } - if (chunk && typeof chunk === "object" && (chunk as { type?: unknown }).type === "trigger:turn-complete") { - latestTs = part.timestamp; - } + onControl: (event) => { + if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return; + const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER); + if (!raw) return; + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed)) latestCursor = parsed; }, }); - // Drain the stream to drive `onPart`. We don't accumulate the chunks — - // each iteration discards the data immediately, so a long session.out - // doesn't blow memory on the retry-boot worker. + // Drain the stream so the underlying SSE reader runs to completion. We + // don't accumulate chunks; `onControl` fires inline as turn-complete + // records arrive. for await (const _ of stream) { // intentionally empty } - return latestTs; + return latestCursor; } /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the * accumulator with prior conversation state, replacing the old wire-borne - * full-history seed. Only the runtime owns this format — customers never - * touch it. + * full-history seed. * - * `lastOutEventId` is the SSE Last-Event-ID after the snapshot's final - * chunk, used to resume `session.out` replay from precisely after the - * snapshot. `lastOutTimestamp` is the same chunk's timestamp, used to - * skip `findLatestTurnCompleteTimestamp` on OOM retry boot. + * The shape is shared with the Sessions dashboard (which reads the same + * blob to render the full conversation transcript) via + * `@trigger.dev/core/v3`. Customer code shouldn't reach in here — the + * SDK transports surface the messages through the standard `messages` + * accumulator. * * @internal */ -export type ChatSnapshotV1 = { - version: 1; - savedAt: number; - messages: TUIMessage[]; - lastOutEventId?: string; - lastOutTimestamp?: number; -}; +export type { ChatSnapshotV1 } from "@trigger.dev/core/v3"; /** * S3 key suffix for a session's snapshot blob. The webapp's presigned-URL @@ -4570,6 +4582,9 @@ function chatAgent< // `chat.createStartSessionAction` or browser-direct) before this // run is triggered — no client-side upsert needed here. locals.set(chatSessionHandleKey, sessions.open(payload.chatId)); + // Mutable holder; advances in `writeTurnCompleteChunk` after each turn + // and is the trim target for the NEXT turn's trim record. + locals.set(lastTurnCompleteSeqNumKey, { value: undefined }); taskContext.setConversationId(payload.chatId); // Stamp `gen_ai.conversation.id` on the run-level span. Every @@ -4651,6 +4666,20 @@ function chatAgent< }); } + // Seed the trim chain from the snapshot's `lastOutEventId` (the SSE + // id of the previous turn's `turn-complete` control record). The + // first turn-complete this worker writes will then trim back to it. + // Without seeding, the new worker would emit no trim on its first + // turn (chain self-bootstraps from turn 2), so this is purely an + // optimization to keep continuation runs bounded from the first turn. + if (bootSnapshot?.lastOutEventId !== undefined) { + const seeded = Number.parseInt(bootSnapshot.lastOutEventId, 10); + if (Number.isFinite(seeded)) { + const slot = locals.get(lastTurnCompleteSeqNumKey); + if (slot) slot.value = seeded; + } + } + try { replayed = await tracer.startActiveSpan("chat.boot.replay", async () => replaySessionOutTail(sessionIdForSnapshot, { @@ -4665,47 +4694,46 @@ function chatAgent< } } - // ── session.in dedup cutoff ──────────────────────────────────── + // ── session.in resume cursor ─────────────────────────────────── // // A fresh worker subscribes to `session.in` from seq 0 and would // re-deliver every record ever appended — including user messages - // from turns already completed on a prior run. Without dedup, the - // loop would re-process them as fresh turns and the slim-wire merge - // would replace-by-id against the snapshot-restored copies, yielding - // no-op replaces while the customer's actual new message waits in - // the queue. + // from turns already completed on a prior run. Without a cursor, + // the loop would re-process them as fresh turns and the slim-wire + // merge would replace-by-id against snapshot-restored copies, + // yielding no-op replaces while the customer's actual new message + // waits in the queue. // - // The cutoff is the timestamp of the last `trigger:turn-complete` - // chunk on `session.out`. When we have a snapshot, that timestamp is - // already in `lastOutTimestamp` — use it directly to skip the - // O(stream-length) scan. Fall back to the scan only when no snapshot - // is available (first-ever OOM retry, or `hydrateMessages` - // short-circuited the snapshot read). + // The cursor is the seq_num of the last `.in` record the prior + // worker committed to processing, persisted on each `turn-complete` + // control record as a `session-in-event-id` sibling header. The + // boot scan reads the header off `.out`'s latest turn-complete and + // seeds the manager so the upcoming `.in` SSE subscribe opens with + // `Last-Event-ID: ` — S2 starts after that seq and old + // messages never reach this worker. // - // Applies in three cases (any of which means session.in has records + // Applies in three cases (any of which means `.in` has records // belonging to completed turns the new run should skip): // - OOM retry (`ctx.attempt.number > 1`) // - Continuation run (`payload.continuation === true`) — prior run // crashed / was canceled / requested upgrade // - Snapshot exists at all (catches edge cases where the wire // didn't set `continuation` but a snapshot indicates prior turns) - const needsDedupCutoff = + const needsResumeCursor = ctx.attempt.number > 1 || payload.continuation === true || bootSnapshot !== undefined; - if (needsDedupCutoff) { + if (needsResumeCursor) { try { - let cutoff = bootSnapshot?.lastOutTimestamp; - if (cutoff === undefined) { - cutoff = await findLatestTurnCompleteTimestamp(payload.chatId); - } - if (cutoff !== undefined) { - sessionStreams.setMinTimestamp(payload.chatId, "in", cutoff); + const cursor = await findLatestSessionInCursor(payload.chatId); + if (cursor !== undefined) { + sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); } } catch (error) { logger.warn( - "chat.agent: session.in dedup cutoff lookup failed; old messages may replay", + "chat.agent: session.in resume cursor lookup failed; old messages may replay", { error: error instanceof Error ? error.message : String(error) } ); } @@ -6409,16 +6437,7 @@ function chatAgent< version: 1, savedAt: Date.now(), messages: accumulatedUIMessages, - // `StreamWriteResult` exposes `lastEventId` only; - // use the snapshot save time as the - // `lastOutTimestamp` cutoff hint. The OOM-retry - // optimization compares this to SSE chunk - // timestamps (ms epoch on the server) — Date.now() - // here is the closest cheap approximation - // available client-side and is consistent with - // the existing turn-complete chunk emission. lastOutEventId: turnCompleteResult?.lastEventId, - lastOutTimestamp: Date.now(), }); }, { @@ -8657,27 +8676,74 @@ export const chat = { }; /** - * Writes a turn-complete control chunk to the chat output stream. - * The frontend transport intercepts this to close the ReadableStream for the current turn. + * Writes a `turn-complete` control record to the chat output stream and, + * if we have a prior turn-complete's seq_num, appends an S2 `trim` command + * record back to it — keeping `session.out` bounded to roughly one turn + * at steady state. + * + * The control record's body is empty; `trigger-control: turn-complete` + * plus an optional `public-access-token` ride on the headers (see + * `docs/ai-chat/client-protocol.mdx`). SDK transports filter it from the + * consumer chunk stream and surface it via `onControl` / `onTurnComplete`. + * + * Trim is opportunistic and monotonic at S2's layer. A failed trim is + * logged and swallowed; the next turn will retry against a fresher + * target seq_num. + * * @internal */ async function writeTurnCompleteChunk( - chatId?: string, + _chatId?: string, publicAccessToken?: string ): Promise { - const { waitUntilComplete } = chatStream.writer({ - spanName: "turn complete", - collapsed: true, - execute: ({ write }) => { - // Transport-intercepted control chunk — not a valid UIMessageChunk - // type but travels on the same session.out stream. - write({ - type: "trigger:turn-complete", - ...(publicAccessToken ? { publicAccessToken } : {}), - } as unknown as UIMessageChunk); - }, - }); - return await waitUntilComplete(); + const session = getChatSession(); + + // 1. Write the turn-complete control record. The ack's `lastEventId` is + // this record's seq_num — that's the trim target for the NEXT turn. + // + // Sibling headers: + // - `public-access-token` (optional): refresh token surfaced to + // browser-side transports via `onTurnComplete`. + // - `session-in-event-id` (optional): the committed-consume cursor + // on `.in` as of this turn-complete. On next worker boot, the + // boot scan reads this back and seeds the `.in` subscription so + // already-processed user messages aren't re-delivered. + const extraHeaders: Array<[string, string]> = []; + if (publicAccessToken) { + extraHeaders.push(["public-access-token", publicAccessToken]); + } + const inCursor = session.in.lastDispatchedSeqNum(); + if (inCursor !== undefined) { + extraHeaders.push([SESSION_IN_EVENT_ID_HEADER, String(inCursor)]); + } + const result = await session.out.writeControl( + TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE, + extraHeaders + ); + const T_N = result.lastEventId ? Number.parseInt(result.lastEventId, 10) : undefined; + + // 2. Trim back to the previous turn-complete, if we have one. Skipping on + // first-turn-ever (or first turn post-OOM without a snapshot seed) is + // fine — the chain catches up next turn. + const slot = locals.get(lastTurnCompleteSeqNumKey); + const prev = slot?.value; + if (slot && prev !== undefined) { + try { + await session.out.trimTo(prev); + } catch (err) { + logger.warn("chat.agent: trim failed; will retry next turn", { + error: err instanceof Error ? err.message : String(err), + prev, + }); + } + } + + // 3. Advance the slot so the next turn-complete trims back to this one. + if (slot && T_N !== undefined && Number.isFinite(T_N)) { + slot.value = T_N; + } + + return result; } /** @@ -8725,16 +8791,8 @@ async function writeUpgradeRequiredChunk(): Promise { } } - const { waitUntilComplete } = chatStream.writer({ - spanName: "upgrade required", - collapsed: true, - execute: ({ write }) => { - write({ - type: "trigger:upgrade-required", - } as unknown as UIMessageChunk); - }, - }); - return await waitUntilComplete(); + const session = getChatSession(); + return session.out.writeControl(TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED); } /** diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 639012cdb8..6fc88df782 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -19,7 +19,13 @@ import type { SessionTriggerConfig, Task } from "@trigger.dev/core/v3"; import type { ModelMessage, UIMessage, UIMessageChunk } from "ai"; import { readUIMessageStream } from "ai"; -import { ApiClient, SSEStreamSubscription, apiClientManager } from "@trigger.dev/core/v3"; +import { + ApiClient, + apiClientManager, + controlSubtype, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; import { sessions } from "./sessions.js"; @@ -710,33 +716,19 @@ export class AgentChat { if (value.id) state.lastEventId = value.id; - // Session records arrive as raw JSON strings (the server - // wraps `{data, id}` on S2). Parse back into objects so - // the control-flow below can inspect chunk.type. - let chunkObj: Record | null = null; - if (value.chunk != null) { - if (typeof value.chunk === "string") { - try { - chunkObj = JSON.parse(value.chunk) as Record; - } catch { - chunkObj = null; - } - } else if (typeof value.chunk === "object") { - chunkObj = value.chunk as Record; - } - } - if (!chunkObj) continue; - - const chunk = chunkObj; + // Trigger control records (turn-complete, upgrade-required) + // route by header — see `client-protocol.mdx`. Their bodies + // are empty; everything substantive is on `value.headers`. + const controlValue = controlSubtype(value.headers); if (state.skipToTurnComplete) { - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { state.skipToTurnComplete = false; } continue; } - if (chunk.type === "trigger:upgrade-required") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { // Server has already triggered the new run via // `end-and-continue`; v2's chunks arrive on the same // S2 stream. Filter the marker for cleanliness and @@ -744,7 +736,7 @@ export class AgentChat { continue; } - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { // Customer's callback may be async (e.g. persisting // lastEventId to a DB). Wrap so a rejected Promise // doesn't surface as an unhandled rejection — that @@ -764,7 +756,11 @@ export class AgentChat { return; } - controller.enqueue(chunk as unknown as UIMessageChunk); + // Data record — `value.chunk` is the parsed UIMessageChunk + // (the SSE parser does the JSON envelope unwrap). Drop + // empty/malformed payloads defensively. + if (value.chunk == null) continue; + controller.enqueue(value.chunk as UIMessageChunk); } } catch (readError) { reader.releaseLock(); diff --git a/packages/trigger-sdk/src/v3/chat-server.ts b/packages/trigger-sdk/src/v3/chat-server.ts index 7ef090a6e6..fe71fe3d86 100644 --- a/packages/trigger-sdk/src/v3/chat-server.ts +++ b/packages/trigger-sdk/src/v3/chat-server.ts @@ -54,7 +54,12 @@ * helpers like `stepCountIs` / `convertToModelMessages`). */ -import { ApiClient, SessionStreamInstance, apiClientManager } from "@trigger.dev/core/v3"; +import { + ApiClient, + SessionStreamInstance, + TRIGGER_CONTROL_SUBTYPE, + apiClientManager, +} from "@trigger.dev/core/v3"; import { convertToModelMessages, generateId as generateAssistantMessageId, @@ -551,7 +556,17 @@ async function openHandoverSession(opts: { // transport can hydrate `state.lastEventId` for turn 2's // subscribe — without it, turn 2 reads session.out from the // start and replays turn 1 to the user. + // + // The agent's `turn-complete` control record is now header- + // form on S2 (see `client-protocol.mdx`), so the + // `for await (const chunk of agentStream)` loop below NEVER + // sees it as a data chunk — `subscribeToSessionStream` routes + // it to `onControl`. Use that to know when to stop and + // synthesise the data-chunk shape the browser bridge still + // expects (this HTTP response stream is NOT S2 and keeps the + // legacy chunk shape for the customer-server-to-browser hop). let latestEventId: string | undefined; + let turnComplete = false; const agentStream = await apiClient.subscribeToSessionStream( chatId, "out", @@ -563,22 +578,29 @@ async function openHandoverSession(opts: { onPart: (part) => { if (part.id) latestEventId = part.id; }, + onControl: (event) => { + if (event.subtype === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + turnComplete = true; + // Synthesise the data-chunk shape for the browser + // bridge. The customer-server-to-browser response is + // not S2; it keeps the legacy chunk shape so the + // browser's transport can recognise turn-complete the + // same way it always has. + controller.enqueue({ + type: "trigger:turn-complete", + } as unknown as UIMessageChunk); + } + }, } ); for await (const chunk of agentStream) { + // Data records only — control records are routed via + // `onControl` above. Stop reading as soon as we see the + // turn-complete control event (the loop may have one more + // data record buffered, but that's fine — we break out). controller.enqueue(chunk); - // The agent's run-loop emits `trigger:turn-complete` when - // the turn finishes. That's our cue to close — anything - // after is the next turn (which goes via the direct - // `session.in`/`session.out` path, not this endpoint). - if ( - chunk && - typeof chunk === "object" && - (chunk as { type?: unknown }).type === "trigger:turn-complete" - ) { - break; - } + if (turnComplete) break; } // Final control chunk: hand the browser transport the diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index eaa69bed93..6d034e7cb5 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -19,10 +19,45 @@ import { TriggerChatTransport, createChatTransport } from "./chat.js"; * parse-once, `=== "object"` → use as-is). We pick the object form * here for test simplicity. */ +/** + * Encode test chunks as a session-stream v2 SSE batch event. Each chunk + * becomes one S2 record; chunks of shape `{type: "trigger:turn-complete"}` + * or `{type: "trigger:upgrade-required"}` are translated into header-form + * control records (empty body, `trigger-control` header) to match the + * production wire shape. + */ function sseEncode(chunks: (UIMessageChunk | Record)[]): string { - return chunks - .map((chunk, i) => `id: ${i}\ndata: ${JSON.stringify(chunk)}\n\n`) - .join(""); + let nextSeq = 1; + const records = chunks.map((chunk, i) => { + const partId = `p-${i}`; + const type = (chunk as { type?: unknown }).type; + if (type === "trigger:turn-complete") { + const headers: Array<[string, string]> = [["trigger-control", "turn-complete"]]; + const token = (chunk as { publicAccessToken?: string }).publicAccessToken; + if (token) headers.push(["public-access-token", token]); + return { + body: "", + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers, + }; + } + if (type === "trigger:upgrade-required") { + return { + body: "", + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers: [["trigger-control", "upgrade-required"]], + }; + } + return { + body: JSON.stringify({ data: chunk, id: partId }), + seq_num: nextSeq++, + timestamp: 1700000000000 + i, + headers: [], + }; + }); + return `event: batch\ndata: ${JSON.stringify({ records })}\n\n`; } function createSSEStream(sseText: string): ReadableStream { @@ -127,7 +162,13 @@ function defaultSseResponse( ): Response { return new Response(createSSEStream(sseEncode(chunks)), { status: 200, - headers: { "content-type": "text/event-stream" }, + headers: { + "content-type": "text/event-stream", + // Session streams are always v2 in production — batch format + // with one S2 record per SSE event. The legacy v1 path is for + // run-scoped Redis streams. + "X-Stream-Version": "v2", + }, }); } diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 980d34c1f0..5b30d56ca4 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -24,7 +24,14 @@ */ import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from "ai"; -import { ApiClient, SSEStreamSubscription } from "@trigger.dev/core/v3"; +import { + ApiClient, + controlSubtype, + headerValue, + PUBLIC_ACCESS_TOKEN_HEADER, + SSEStreamSubscription, + TRIGGER_CONTROL_SUBTYPE, +} from "@trigger.dev/core/v3"; import { ChatTabCoordinator } from "./chat-tab-coordinator.js"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; @@ -980,10 +987,12 @@ export class TriggerChatTransport implements ChatTransport { /** * Open an SSE subscription to the session's `.out` stream and pipe - * UIMessageChunks through to the AI SDK. Filters control chunks - * (`trigger:turn-complete`, `trigger:upgrade-required`) — the latter - * is purely telemetry now since the server handles the run swap - * inline (see `end-and-continue`). + * UIMessageChunks through to the AI SDK. Trigger control records + * (`turn-complete`, `upgrade-required` — see `trigger-control` header + * on `client-protocol.mdx#records-on-session-out`) are routed by + * header and never reach the consumer. `upgrade-required` is purely + * telemetry now since the server handles the run swap inline (see + * `end-and-continue`). */ private subscribeToSessionStream( state: ChatSessionState, @@ -1144,7 +1153,12 @@ export class TriggerChatTransport implements ChatTransport { } while (true) { - let value: { id: string; chunk: unknown; timestamp: number }; + let value: { + id: string; + chunk: unknown; + timestamp: number; + headers?: ReadonlyArray; + }; if (primed !== undefined) { value = primed; primed = undefined; @@ -1166,32 +1180,20 @@ export class TriggerChatTransport implements ChatTransport { if (value.id) state.lastEventId = value.id; - // Session SSE delivers raw record bodies as strings (the - // server wraps them in `{data, id}` for S2). Parse so the - // rest of the loop can treat chunks as objects. - let chunkObj: Record | null = null; - if (value.chunk != null) { - if (typeof value.chunk === "string") { - try { - chunkObj = JSON.parse(value.chunk) as Record; - } catch { - chunkObj = null; - } - } else if (typeof value.chunk === "object") { - chunkObj = value.chunk as Record; - } - } - if (!chunkObj) continue; - const chunk = chunkObj; + // Trigger control record (turn-complete, upgrade-required) — + // routed by header, body is empty. Detect via the + // `trigger-control` header on the SSE record. Data records + // (UIMessageChunks) fall through to the chunk path below. + const controlValue = controlSubtype(value.headers); if (state.skipToTurnComplete) { - if (chunk.type === "trigger:turn-complete") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { state.skipToTurnComplete = false; } continue; } - if (chunk.type === "trigger:upgrade-required") { + if (controlValue === TRIGGER_CONTROL_SUBTYPE.UPGRADE_REQUIRED) { // Server has already triggered the new run via // `end-and-continue`; the next chunks on this same `.out` // stream come from v2. Filter the marker for cleanliness @@ -1199,9 +1201,10 @@ export class TriggerChatTransport implements ChatTransport { continue; } - if (chunk.type === "trigger:turn-complete") { - if (typeof chunk.publicAccessToken === "string") { - state.publicAccessToken = chunk.publicAccessToken; + if (controlValue === TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) { + const refreshedToken = headerValue(value.headers, PUBLIC_ACCESS_TOKEN_HEADER); + if (refreshedToken) { + state.publicAccessToken = refreshedToken; } state.isStreaming = false; this.notifySessionChange(chatId, state); @@ -1221,7 +1224,11 @@ export class TriggerChatTransport implements ChatTransport { return; } - controller.enqueue(chunk as unknown as UIMessageChunk); + // Data record — `value.chunk` is the parsed UIMessageChunk + // unwrapped from the S2 record envelope (the parser does the + // JSON unwrap). Drop empty/malformed payloads defensively. + if (value.chunk == null) continue; + controller.enqueue(value.chunk as UIMessageChunk); } } catch (error) { if (error instanceof Error && error.name === "AbortError") { diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index 3763a27146..663dbbebc3 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -31,7 +31,10 @@ import { runtime, sessionStreams, taskContext, + trimSessionStream, + writeSessionControlRecord, } from "@trigger.dev/core/v3"; +import type { ControlEvent, StreamWriteResult } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { SpanStatusCode } from "@opentelemetry/api"; import { tracer } from "./tracer.js"; @@ -390,6 +393,7 @@ export class SessionOutputChannel { lastEventId: options?.lastEventId != null ? String(options.lastEventId) : undefined, onPart: options?.onPart, + onControl: options?.onControl, onComplete: options?.onComplete, onError: options?.onError, }); @@ -464,6 +468,38 @@ export class SessionOutputChannel { throw error; } } + + /** + * Write a single Trigger control record to `.out`. The record carries a + * `trigger-control` header valued with `subtype` plus any sibling + * `extraHeaders`; the body is empty. Control records are filtered out of + * the consumer-facing chunk stream by the SDK transport — readers route + * them via the `onControl` callback instead. + * + * The returned `lastEventId` is the S2 seq_num of the written record, + * useful for trim chains (e.g. trim back to the previous turn-complete). + */ + async writeControl( + subtype: string, + extraHeaders?: ReadonlyArray + ): Promise { + const apiClient = apiClientManager.clientOrThrow(); + return writeSessionControlRecord(apiClient, this.sessionId, "out", subtype, extraHeaders); + } + + /** + * Append an S2 `trim` command record to `.out`. Records with seq_num + * less than `earliestSeqNum` are eventually removed from the stream. + * + * Idempotent and monotonic at S2's layer (`max(existing, min(provided, + * current_tail))`) — backward trims are silently no-ops for deletion + * but still consume a seq_num. Used by `chat.agent`'s turn loop to + * keep `session.out` bounded to roughly one turn at steady state. + */ + async trimTo(earliestSeqNum: number): Promise { + const apiClient = apiClientManager.clientOrThrow(); + await trimSessionStream(apiClient, this.sessionId, earliestSeqNum); + } } /** @@ -557,6 +593,20 @@ export class SessionInputChannel { return sessionStreams.peek(this.sessionId, "in") as T | undefined; } + /** + * The highest S2 sequence number of any record this channel has + * delivered to a `once()` / `wait()` consumer (or had shifted off its + * buffer into one). Distinct from "last received" — buffered-but-not- + * yet-consumed records don't count. + * + * Used by `chat.agent` to persist the `.in` resume cursor on each + * `turn-complete` control record, so the next worker boot can subscribe + * past already-processed user messages. + */ + lastDispatchedSeqNum(): number | undefined { + return sessionStreams.lastDispatchedSeqNum(this.sessionId, "in"); + } + /** * Suspend the current run until the next record arrives on `.in`. * Unlike {@link once}, `wait()` frees compute while blocked — the @@ -727,6 +777,13 @@ export type SessionSubscribeOptions = { timeoutInSeconds?: number; /** Called for each SSE event with the full event metadata (id, timestamp). */ onPart?: (part: { id: string; chunk: T; timestamp: number }) => void; + /** + * Called when a `trigger-control` record arrives on the stream (e.g. + * `turn-complete`, `upgrade-required`). Control records are filtered + * out of the consumer chunk stream — handle them here. See + * `docs/ai-chat/client-protocol.mdx` for the wire shape. + */ + onControl?: (event: ControlEvent) => void; /** Called when the server signals end-of-stream. */ onComplete?: () => void; /** Called on unrecoverable errors after the retry budget is exhausted. */ diff --git a/packages/trigger-sdk/src/v3/test/test-session-handle.ts b/packages/trigger-sdk/src/v3/test/test-session-handle.ts index 71bc9d8d7b..93d6146ddd 100644 --- a/packages/trigger-sdk/src/v3/test/test-session-handle.ts +++ b/packages/trigger-sdk/src/v3/test/test-session-handle.ts @@ -242,6 +242,39 @@ export class TestSessionOutputChannel extends SessionOutputChannel { "inspect `harness.allChunks` / `harness.allRawChunks` instead." ); } + + /** + * Override the one-shot control-record path. In production this goes + * direct to S2 with header-form records; in tests we project it back + * into the chunk-shape the harness already understands (the listener + * watches for `{type: "trigger:turn-complete"}` to drive turn-complete + * latches). Returns an empty `StreamWriteResult` — tests don't observe + * the seq_num, and trim seeding only matters in production. + */ + async writeControl( + subtype: string, + extraHeaders?: ReadonlyArray + ): Promise { + const synthetic: Record = { type: `trigger:${subtype}` }; + if (extraHeaders) { + for (const [name, value] of extraHeaders) { + if (name === "public-access-token") { + synthetic.publicAccessToken = value; + } + } + } + notify(this.state, synthetic); + return {}; + } + + /** + * No-op in the mock harness. Production trims keep `session.out` bounded; + * the in-memory `state.chunks` array doesn't need trimming and tests + * that care about trim behaviour exercise it via the real S2 code path. + */ + async trimTo(_earliestSeqNum: number): Promise { + // Intentionally a no-op for the mock harness. + } } /** diff --git a/packages/trigger-sdk/test/chat-snapshot.test.ts b/packages/trigger-sdk/test/chat-snapshot.test.ts index e7421cdbd9..cac3364639 100644 --- a/packages/trigger-sdk/test/chat-snapshot.test.ts +++ b/packages/trigger-sdk/test/chat-snapshot.test.ts @@ -29,7 +29,6 @@ function buildSnapshot(count = 1): ChatSnapshotV1 { parts: [{ type: "text" as const, text: `hello ${i}` }], })), lastOutEventId: "evt-42", - lastOutTimestamp: 2_000_000, }; }