11import type { UIMessage } from "@ai-sdk/react" ;
2- import { SSEStreamSubscription } from "@trigger.dev/core/v3" ;
2+ import { ChatSnapshotV1Schema , SSEStreamSubscription } from "@trigger.dev/core/v3" ;
33import { useEffect , useMemo , useRef , useState } from "react" ;
44import { Paragraph } from "~/components/primitives/Paragraph" ;
55import { Spinner } from "~/components/primitives/Spinner" ;
@@ -27,6 +27,15 @@ export type AgentViewAuth = {
2727 * channel and is merged in by the AgentView subscription.
2828 */
2929 initialMessages : UIMessage [ ] ;
30+ /**
31+ * Presigned GET URL for the session's chat-snapshot S3 blob (written
32+ * by the agent after each turn-complete; see `ChatSnapshotV1`).
33+ * Optional — sessions that registered a `hydrateMessages` hook skip
34+ * snapshot writes and the URL fetch will 404. In that case the
35+ * dashboard falls back to seq=0 SSE (which, post-trim, shows only the
36+ * most recent turn). Generated server-side by `SessionPresenter`.
37+ */
38+ snapshotPresignedUrl ?: string ;
3039} ;
3140
3241/**
@@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
8190 projectSlug : project . slug ,
8291 envSlug : environment . slug ,
8392 initialMessages : agentView . initialMessages ,
93+ snapshotPresignedUrl : agentView . snapshotPresignedUrl ,
8494 } ) ;
8595
8696 // Sticky-bottom auto-scroll: walks up to find the inspector's scroll
@@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
120130 * - `kind: "stop"` is a stop signal — no messages, nothing to render
121131 * here, so it's filtered.
122132 *
133+ * Wire payloads are slim-wire (one new UIMessage per record, on
134+ * `payload.message`). The legacy `payload.messages` array shape is kept
135+ * here as a fallback so any historical records on a long-lived session
136+ * still render.
137+ *
123138 * The server wraps records in `{data, id}` and writes `data` as a JSON
124139 * string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
125140 * re-parses to recover the object.
126141 */
127142type InputStreamChunk = {
128143 kind ?: "message" | "stop" ;
129144 payload ?: {
145+ message ?: { id ?: string ; role ?: string ; parts ?: unknown [ ] } ;
130146 messages ?: Array < { id ?: string ; role ?: string ; parts ?: unknown [ ] } > ;
131147 trigger ?: string ;
132148 } ;
@@ -217,13 +233,15 @@ function useAgentSessionMessages({
217233 projectSlug,
218234 envSlug,
219235 initialMessages,
236+ snapshotPresignedUrl,
220237} : {
221238 sessionId : string ;
222239 apiOrigin : string ;
223240 orgSlug : string ;
224241 projectSlug : string ;
225242 envSlug : string ;
226243 initialMessages : UIMessage [ ] ;
244+ snapshotPresignedUrl ?: string ;
227245} ) : UIMessage [ ] {
228246 // Seed with the user messages from the run's task payload.
229247 const seedMessages = useMemo (
@@ -285,27 +303,92 @@ function useAgentSessionMessages({
285303 const outputUrl = `${ sessionBase } /out` ;
286304 const inputUrl = `${ sessionBase } /in` ;
287305
306+ /**
307+ * Try to seed `pendingRef` from the agent's S3 snapshot blob and return
308+ * the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
309+ * just past the snapshot. Returns undefined for sessions that don't
310+ * have a snapshot (e.g. `hydrateMessages` customers, or sessions that
311+ * have never completed a turn).
312+ */
313+ const loadSnapshot = async ( ) : Promise < string | undefined > => {
314+ if ( ! snapshotPresignedUrl ) return undefined ;
315+ try {
316+ const resp = await fetch ( snapshotPresignedUrl , { signal : abort . signal } ) ;
317+ if ( ! resp . ok ) return undefined ;
318+ const json = ( await resp . json ( ) ) as unknown ;
319+ const parsed = ChatSnapshotV1Schema . safeParse ( json ) ;
320+ if ( ! parsed . success ) return undefined ;
321+ const snapshot = parsed . data ;
322+ // Preserve the snapshot's array order in the final render by
323+ // giving each message a unique, monotonically increasing
324+ // timestamp from `(savedAt - count + index)`. Real chunk
325+ // timestamps from the SSE path use S2 arrival ms (positive
326+ // numbers in the present), so anything below `savedAt` sorts
327+ // before live chunks while preserving snapshot order among
328+ // themselves.
329+ const count = snapshot . messages . length ;
330+ snapshot . messages . forEach ( ( raw , i ) => {
331+ const message = raw as UIMessage ;
332+ if ( ! message ?. id ) return ;
333+ // The snapshot's seed wins over the task-payload seed for any
334+ // overlapping ids (the snapshot represents the agent's
335+ // canonical accumulator, post-turn).
336+ pendingRef . current . set ( message . id , message ) ;
337+ if ( ! timestampsRef . current . has ( message . id ) ) {
338+ timestampsRef . current . set ( message . id , snapshot . savedAt - count + i ) ;
339+ }
340+ } ) ;
341+ scheduleFlush . current ( ) ;
342+ return snapshot . lastOutEventId ;
343+ } catch {
344+ // 404 / network / parse / abort — fall back to seq=0 SSE
345+ return undefined ;
346+ }
347+ } ;
348+
349+ const outputSubOptions = ( lastEventId : string | undefined ) =>
350+ ( {
351+ signal : abort . signal ,
352+ timeoutInSeconds : 120 ,
353+ ...( lastEventId !== undefined ? { lastEventId } : { } ) ,
354+ } ) as const ;
355+
288356 const commonSubOptions = {
289357 signal : abort . signal ,
290358 timeoutInSeconds : 120 ,
291359 } as const ;
292360
293361 // ---- Output stream: assistant messages ---------------------------------
294362 //
295- // The output stream delivers UIMessageChunks interleaved with
296- // Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
297- // filter the control chunks and fold everything else into an assistant
298- // `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
299- // `readUIMessageStream` helper is only available in `ai@6`, and the
300- // webapp is pinned to `ai@4`, so we re-implement just the chunk types
301- // that `renderPart` actually displays.
363+ // The output stream delivers data records (UIMessageChunks) interleaved
364+ // with Trigger control records (`turn-complete`, `upgrade-required`) and
365+ // S2 command records (`trim`). Control + command records ride on
366+ // `record.headers` with empty bodies; the SSE parser strips S2 command
367+ // records entirely, and control records arrive with `value.chunk ===
368+ // undefined`, which `parseChunkPayload` drops below.
369+ //
370+ // We fold everything else into an assistant `UIMessage` via our own
371+ // `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
372+ // helper is only available in `ai@6`, and the webapp is pinned to
373+ // `ai@4`, so we re-implement just the chunk types that `renderPart`
374+ // actually displays.
302375 //
303376 // We capture the **server timestamp of each assistant message's first
304377 // `start` chunk** so later sort-by-timestamp merges with the input
305378 // stream correctly.
306379 const runOutput = async ( ) => {
307380 try {
308- const sub = new SSEStreamSubscription ( outputUrl , commonSubOptions ) ;
381+ // Seed messages from the snapshot first (if available), then
382+ // resume the SSE from the snapshot's last event id so we don't
383+ // re-stream chunks already represented in the snapshot. If no
384+ // snapshot exists (no URL, 404, parse failure), the SSE opens
385+ // at seq=0 — which, post-trim, contains roughly one turn of
386+ // records (acceptable fallback for `hydrateMessages` sessions
387+ // and fresh sessions).
388+ const snapshotLastEventId = await loadSnapshot ( ) ;
389+ if ( abort . signal . aborted ) return ;
390+
391+ const sub = new SSEStreamSubscription ( outputUrl , outputSubOptions ( snapshotLastEventId ) ) ;
309392 const raw = await sub . subscribe ( ) ;
310393 const reader = raw . getReader ( ) ;
311394
@@ -318,6 +401,12 @@ function useAgentSessionMessages({
318401
319402 const chunk = parseChunkPayload ( value . chunk ) as OutputChunk | null ;
320403 if ( ! chunk || typeof chunk . type !== "string" ) continue ;
404+ // Legacy belt-and-suspenders: prior versions of the SDK
405+ // emitted `trigger:turn-complete` / `trigger:upgrade-required`
406+ // as data records (`type` field). Current versions use
407+ // header-form control records, which `parseChunkPayload`
408+ // drops above. Keep this filter to handle any in-flight
409+ // sessions whose `.out` was populated by the older SDK.
321410 if ( chunk . type . startsWith ( "trigger:" ) ) continue ;
322411
323412 if ( chunk . type === "start" ) {
@@ -413,9 +502,18 @@ function useAgentSessionMessages({
413502 const chunk = parseChunkPayload ( value . chunk ) as InputStreamChunk | null ;
414503 if ( ! chunk || chunk . kind !== "message" ) continue ;
415504 const payload = chunk . payload ;
416- if ( ! payload || ! Array . isArray ( payload . messages ) ) continue ;
417-
418- const incomingUsers = payload . messages . filter (
505+ if ( ! payload ) continue ;
506+
507+ // Slim-wire is one UIMessage on `payload.message`; legacy
508+ // payloads carried an array on `payload.messages`. Accept
509+ // either so historical records on a long-lived session still
510+ // render.
511+ const candidates = Array . isArray ( payload . messages )
512+ ? payload . messages
513+ : payload . message
514+ ? [ payload . message ]
515+ : [ ] ;
516+ const incomingUsers = candidates . filter (
419517 ( m ) : m is UIMessage =>
420518 m != null && ( m as { role ?: string } ) . role === "user" && typeof m . id === "string"
421519 ) ;
@@ -454,7 +552,7 @@ function useAgentSessionMessages({
454552 pendingTimerRef . current = null ;
455553 }
456554 } ;
457- } , [ sessionId , apiOrigin , orgSlug , projectSlug , envSlug ] ) ;
555+ } , [ sessionId , apiOrigin , orgSlug , projectSlug , envSlug , snapshotPresignedUrl ] ) ;
458556
459557 return useMemo ( ( ) => {
460558 const timestamps = timestampsRef . current ;
0 commit comments