Skip to content

Commit b3aa85b

Browse files
committed
feat(webapp,core,sdk,cli): bound session.out via per-turn trim
After each `trigger:turn-complete`, the agent appends an S2 `trim` command back to the previous turn-complete's seq_num. `session.out` stays roughly one turn long at steady state, regardless of how long the chat has been running. `trigger:turn-complete` and `trigger:upgrade-required` move from `chunk.type`-shaped data records into header-form control records under a `trigger-control` namespace. Built-in transports (`TriggerChatTransport`, `AgentChat`, dashboard `AgentView`) handle this transparently; custom transports need a one-line filter on the `trigger-control` header. The Sessions detail page in the dashboard fetches the per-turn S3 snapshot via a presigned URL and seeds the transcript view, then SSE-tails from the snapshot's `lastOutEventId`. Bandwidth and time-to-first-render scale with unread turns, not session lifetime.
1 parent a8280f1 commit b3aa85b

23 files changed

Lines changed: 1151 additions & 238 deletions

File tree

apps/webapp/app/components/runs/v3/agent/AgentView.tsx

Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { UIMessage } from "@ai-sdk/react";
2-
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
2+
import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3";
33
import { useEffect, useMemo, useRef, useState } from "react";
44
import { Paragraph } from "~/components/primitives/Paragraph";
55
import { Spinner } from "~/components/primitives/Spinner";
@@ -27,6 +27,15 @@ export type AgentViewAuth = {
2727
* channel and is merged in by the AgentView subscription.
2828
*/
2929
initialMessages: UIMessage[];
30+
/**
31+
* Presigned GET URL for the session's chat-snapshot S3 blob (written
32+
* by the agent after each turn-complete; see `ChatSnapshotV1`).
33+
* Optional — sessions that registered a `hydrateMessages` hook skip
34+
* snapshot writes and the URL fetch will 404. In that case the
35+
* dashboard falls back to seq=0 SSE (which, post-trim, shows only the
36+
* most recent turn). Generated server-side by `SessionPresenter`.
37+
*/
38+
snapshotPresignedUrl?: string;
3039
};
3140

3241
/**
@@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
8190
projectSlug: project.slug,
8291
envSlug: environment.slug,
8392
initialMessages: agentView.initialMessages,
93+
snapshotPresignedUrl: agentView.snapshotPresignedUrl,
8494
});
8595

8696
// Sticky-bottom auto-scroll: walks up to find the inspector's scroll
@@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
120130
* - `kind: "stop"` is a stop signal — no messages, nothing to render
121131
* here, so it's filtered.
122132
*
133+
* Wire payloads are slim-wire (one new UIMessage per record, on
134+
* `payload.message`). The legacy `payload.messages` array shape is kept
135+
* here as a fallback so any historical records on a long-lived session
136+
* still render.
137+
*
123138
* The server wraps records in `{data, id}` and writes `data` as a JSON
124139
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
125140
* re-parses to recover the object.
126141
*/
127142
type InputStreamChunk = {
128143
kind?: "message" | "stop";
129144
payload?: {
145+
message?: { id?: string; role?: string; parts?: unknown[] };
130146
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
131147
trigger?: string;
132148
};
@@ -217,13 +233,15 @@ function useAgentSessionMessages({
217233
projectSlug,
218234
envSlug,
219235
initialMessages,
236+
snapshotPresignedUrl,
220237
}: {
221238
sessionId: string;
222239
apiOrigin: string;
223240
orgSlug: string;
224241
projectSlug: string;
225242
envSlug: string;
226243
initialMessages: UIMessage[];
244+
snapshotPresignedUrl?: string;
227245
}): UIMessage[] {
228246
// Seed with the user messages from the run's task payload.
229247
const seedMessages = useMemo(
@@ -285,27 +303,92 @@ function useAgentSessionMessages({
285303
const outputUrl = `${sessionBase}/out`;
286304
const inputUrl = `${sessionBase}/in`;
287305

306+
/**
307+
* Try to seed `pendingRef` from the agent's S3 snapshot blob and return
308+
* the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
309+
* just past the snapshot. Returns undefined for sessions that don't
310+
* have a snapshot (e.g. `hydrateMessages` customers, or sessions that
311+
* have never completed a turn).
312+
*/
313+
const loadSnapshot = async (): Promise<string | undefined> => {
314+
if (!snapshotPresignedUrl) return undefined;
315+
try {
316+
const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal });
317+
if (!resp.ok) return undefined;
318+
const json = (await resp.json()) as unknown;
319+
const parsed = ChatSnapshotV1Schema.safeParse(json);
320+
if (!parsed.success) return undefined;
321+
const snapshot = parsed.data;
322+
// Preserve the snapshot's array order in the final render by
323+
// giving each message a unique, monotonically increasing
324+
// timestamp from `(savedAt - count + index)`. Real chunk
325+
// timestamps from the SSE path use S2 arrival ms (positive
326+
// numbers in the present), so anything below `savedAt` sorts
327+
// before live chunks while preserving snapshot order among
328+
// themselves.
329+
const count = snapshot.messages.length;
330+
snapshot.messages.forEach((raw, i) => {
331+
const message = raw as UIMessage;
332+
if (!message?.id) return;
333+
// The snapshot's seed wins over the task-payload seed for any
334+
// overlapping ids (the snapshot represents the agent's
335+
// canonical accumulator, post-turn).
336+
pendingRef.current.set(message.id, message);
337+
if (!timestampsRef.current.has(message.id)) {
338+
timestampsRef.current.set(message.id, snapshot.savedAt - count + i);
339+
}
340+
});
341+
scheduleFlush.current();
342+
return snapshot.lastOutEventId;
343+
} catch {
344+
// 404 / network / parse / abort — fall back to seq=0 SSE
345+
return undefined;
346+
}
347+
};
348+
349+
const outputSubOptions = (lastEventId: string | undefined) =>
350+
({
351+
signal: abort.signal,
352+
timeoutInSeconds: 120,
353+
...(lastEventId !== undefined ? { lastEventId } : {}),
354+
}) as const;
355+
288356
const commonSubOptions = {
289357
signal: abort.signal,
290358
timeoutInSeconds: 120,
291359
} as const;
292360

293361
// ---- Output stream: assistant messages ---------------------------------
294362
//
295-
// The output stream delivers UIMessageChunks interleaved with
296-
// Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
297-
// filter the control chunks and fold everything else into an assistant
298-
// `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
299-
// `readUIMessageStream` helper is only available in `ai@6`, and the
300-
// webapp is pinned to `ai@4`, so we re-implement just the chunk types
301-
// that `renderPart` actually displays.
363+
// The output stream delivers data records (UIMessageChunks) interleaved
364+
// with Trigger control records (`turn-complete`, `upgrade-required`) and
365+
// S2 command records (`trim`). Control + command records ride on
366+
// `record.headers` with empty bodies; the SSE parser strips S2 command
367+
// records entirely, and control records arrive with `value.chunk ===
368+
// undefined`, which `parseChunkPayload` drops below.
369+
//
370+
// We fold everything else into an assistant `UIMessage` via our own
371+
// `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
372+
// helper is only available in `ai@6`, and the webapp is pinned to
373+
// `ai@4`, so we re-implement just the chunk types that `renderPart`
374+
// actually displays.
302375
//
303376
// We capture the **server timestamp of each assistant message's first
304377
// `start` chunk** so later sort-by-timestamp merges with the input
305378
// stream correctly.
306379
const runOutput = async () => {
307380
try {
308-
const sub = new SSEStreamSubscription(outputUrl, commonSubOptions);
381+
// Seed messages from the snapshot first (if available), then
382+
// resume the SSE from the snapshot's last event id so we don't
383+
// re-stream chunks already represented in the snapshot. If no
384+
// snapshot exists (no URL, 404, parse failure), the SSE opens
385+
// at seq=0 — which, post-trim, contains roughly one turn of
386+
// records (acceptable fallback for `hydrateMessages` sessions
387+
// and fresh sessions).
388+
const snapshotLastEventId = await loadSnapshot();
389+
if (abort.signal.aborted) return;
390+
391+
const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId));
309392
const raw = await sub.subscribe();
310393
const reader = raw.getReader();
311394

@@ -318,6 +401,12 @@ function useAgentSessionMessages({
318401

319402
const chunk = parseChunkPayload(value.chunk) as OutputChunk | null;
320403
if (!chunk || typeof chunk.type !== "string") continue;
404+
// Legacy belt-and-suspenders: prior versions of the SDK
405+
// emitted `trigger:turn-complete` / `trigger:upgrade-required`
406+
// as data records (`type` field). Current versions use
407+
// header-form control records, which `parseChunkPayload`
408+
// drops above. Keep this filter to handle any in-flight
409+
// sessions whose `.out` was populated by the older SDK.
321410
if (chunk.type.startsWith("trigger:")) continue;
322411

323412
if (chunk.type === "start") {
@@ -413,9 +502,18 @@ function useAgentSessionMessages({
413502
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
414503
if (!chunk || chunk.kind !== "message") continue;
415504
const payload = chunk.payload;
416-
if (!payload || !Array.isArray(payload.messages)) continue;
417-
418-
const incomingUsers = payload.messages.filter(
505+
if (!payload) continue;
506+
507+
// Slim-wire is one UIMessage on `payload.message`; legacy
508+
// payloads carried an array on `payload.messages`. Accept
509+
// either so historical records on a long-lived session still
510+
// render.
511+
const candidates = Array.isArray(payload.messages)
512+
? payload.messages
513+
: payload.message
514+
? [payload.message]
515+
: [];
516+
const incomingUsers = candidates.filter(
419517
(m): m is UIMessage =>
420518
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
421519
);
@@ -454,7 +552,7 @@ function useAgentSessionMessages({
454552
pendingTimerRef.current = null;
455553
}
456554
};
457-
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
555+
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]);
458556

459557
return useMemo(() => {
460558
const timestamps = timestampsRef.current;

apps/webapp/app/presenters/v3/SessionPresenter.server.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { type Span } from "@opentelemetry/api";
2+
import { chatSnapshotKeySuffix } from "@trigger.dev/core/v3";
23
import { type PrismaClientOrTransaction } from "@trigger.dev/database";
34
import { env } from "~/env.server";
45
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
56
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
7+
import { logger } from "~/services/logger.server";
8+
import { generatePresignedUrl } from "~/v3/objectStore.server";
69
import { ServiceValidationError } from "~/v3/services/baseService.server";
710
import { startActiveSpan } from "~/v3/tracer.server";
811

@@ -15,6 +18,8 @@ export class SessionPresenter {
1518
userId: string;
1619
environmentId: string;
1720
sessionParam: string;
21+
projectExternalRef: string;
22+
environmentSlug: string;
1823
}) {
1924
return startActiveSpan(
2025
"SessionPresenter.call",
@@ -33,10 +38,14 @@ export class SessionPresenter {
3338
userId,
3439
environmentId,
3540
sessionParam,
41+
projectExternalRef,
42+
environmentSlug,
3643
}: {
3744
userId: string;
3845
environmentId: string;
3946
sessionParam: string;
47+
projectExternalRef: string;
48+
environmentSlug: string;
4049
},
4150
rootSpan: Span
4251
) {
@@ -112,6 +121,48 @@ export class SessionPresenter {
112121
// unused — kept here to match the existing `AgentViewAuth` shape.
113122
const addressingKey = session.externalId ?? session.friendlyId;
114123

124+
// Presign a GET URL for the agent's S3 snapshot blob. The browser
125+
// fetches it directly, parses + validates, and seeds the
126+
// TriggerChatTransport with the full history + lastEventId before
127+
// opening the SSE. Presign succeeds regardless of whether the blob
128+
// exists; the frontend handles 404 gracefully.
129+
//
130+
// Snapshots are only written when no `hydrateMessages` hook is
131+
// registered — sessions that use `hydrateMessages` will 404 here
132+
// and the dashboard falls back to seq=0 SSE (which, post-trim,
133+
// shows only the most recent turn — accepted, those customers
134+
// have their own DB-backed dashboards).
135+
// The agent writes snapshots keyed on the session's friendlyId (the
136+
// `session_*` form), which matches what the SDK's `chat.agent` payload
137+
// carries as `sessionId`. Use the same key shape here so the dashboard
138+
// hits the same S3 object.
139+
let snapshotPresignedUrl: string | undefined;
140+
try {
141+
const signed = await startActiveSpan(
142+
"SessionPresenter.presignSnapshot",
143+
async () =>
144+
generatePresignedUrl(
145+
projectExternalRef,
146+
environmentSlug,
147+
chatSnapshotKeySuffix(session.friendlyId),
148+
"GET"
149+
)
150+
);
151+
if (signed.success) {
152+
snapshotPresignedUrl = signed.url;
153+
} else {
154+
logger.warn("SessionPresenter: snapshot presign failed", {
155+
sessionId: session.id,
156+
error: signed.error,
157+
});
158+
}
159+
} catch (error) {
160+
logger.warn("SessionPresenter: snapshot presign threw", {
161+
sessionId: session.id,
162+
error: error instanceof Error ? error.message : String(error),
163+
});
164+
}
165+
115166
return {
116167
id: session.id,
117168
friendlyId: session.friendlyId,
@@ -147,6 +198,7 @@ export class SessionPresenter {
147198
apiOrigin: env.API_ORIGIN || env.LOGIN_ORIGIN,
148199
sessionId: addressingKey,
149200
initialMessages: [],
201+
snapshotPresignedUrl,
150202
},
151203
};
152204
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
7979
userId,
8080
environmentId: environment.id,
8181
sessionParam,
82+
projectExternalRef: project.externalRef,
83+
environmentSlug: environment.slug,
8284
});
8385

8486
if (!session) {

0 commit comments

Comments
 (0)