Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 111 additions & 13 deletions apps/webapp/app/components/runs/v3/agent/AgentView.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { UIMessage } from "@ai-sdk/react";
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3";
import { useEffect, useMemo, useRef, useState } from "react";
import { Paragraph } from "~/components/primitives/Paragraph";
import { Spinner } from "~/components/primitives/Spinner";
Expand Down Expand Up @@ -27,6 +27,15 @@ export type AgentViewAuth = {
* channel and is merged in by the AgentView subscription.
*/
initialMessages: UIMessage[];
/**
* Presigned GET URL for the session's chat-snapshot S3 blob (written
* by the agent after each turn-complete; see `ChatSnapshotV1`).
* Optional — sessions that registered a `hydrateMessages` hook skip
* snapshot writes and the URL fetch will 404. In that case the
* dashboard falls back to seq=0 SSE (which, post-trim, shows only the
* most recent turn). Generated server-side by `SessionPresenter`.
*/
snapshotPresignedUrl?: string;
};

/**
Expand Down Expand Up @@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
projectSlug: project.slug,
envSlug: environment.slug,
initialMessages: agentView.initialMessages,
snapshotPresignedUrl: agentView.snapshotPresignedUrl,
});

// Sticky-bottom auto-scroll: walks up to find the inspector's scroll
Expand Down Expand Up @@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
* - `kind: "stop"` is a stop signal — no messages, nothing to render
* here, so it's filtered.
*
* Wire payloads are slim-wire (one new UIMessage per record, on
* `payload.message`). The legacy `payload.messages` array shape is kept
* here as a fallback so any historical records on a long-lived session
* still render.
*
* The server wraps records in `{data, id}` and writes `data` as a JSON
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
* re-parses to recover the object.
*/
type InputStreamChunk = {
kind?: "message" | "stop";
payload?: {
message?: { id?: string; role?: string; parts?: unknown[] };
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
trigger?: string;
};
Expand Down Expand Up @@ -217,13 +233,15 @@ function useAgentSessionMessages({
projectSlug,
envSlug,
initialMessages,
snapshotPresignedUrl,
}: {
sessionId: string;
apiOrigin: string;
orgSlug: string;
projectSlug: string;
envSlug: string;
initialMessages: UIMessage[];
snapshotPresignedUrl?: string;
}): UIMessage[] {
// Seed with the user messages from the run's task payload.
const seedMessages = useMemo(
Expand Down Expand Up @@ -285,27 +303,92 @@ function useAgentSessionMessages({
const outputUrl = `${sessionBase}/out`;
const inputUrl = `${sessionBase}/in`;

/**
* Try to seed `pendingRef` from the agent's S3 snapshot blob and return
* the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
* just past the snapshot. Returns undefined for sessions that don't
* have a snapshot (e.g. `hydrateMessages` customers, or sessions that
* have never completed a turn).
*/
const loadSnapshot = async (): Promise<string | undefined> => {
if (!snapshotPresignedUrl) return undefined;
try {
const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal });
if (!resp.ok) return undefined;
const json = (await resp.json()) as unknown;
const parsed = ChatSnapshotV1Schema.safeParse(json);
if (!parsed.success) return undefined;
const snapshot = parsed.data;
// Preserve the snapshot's array order in the final render by
// giving each message a unique, monotonically increasing
// timestamp from `(savedAt - count + index)`. Real chunk
// timestamps from the SSE path use S2 arrival ms (positive
// numbers in the present), so anything below `savedAt` sorts
// before live chunks while preserving snapshot order among
// themselves.
const count = snapshot.messages.length;
snapshot.messages.forEach((raw, i) => {
const message = raw as UIMessage;
if (!message?.id) return;
// The snapshot's seed wins over the task-payload seed for any
// overlapping ids (the snapshot represents the agent's
// canonical accumulator, post-turn).
pendingRef.current.set(message.id, message);
if (!timestampsRef.current.has(message.id)) {
timestampsRef.current.set(message.id, snapshot.savedAt - count + i);
}
});
scheduleFlush.current();
return snapshot.lastOutEventId;
} catch {
// 404 / network / parse / abort — fall back to seq=0 SSE
return undefined;
}
};

const outputSubOptions = (lastEventId: string | undefined) =>
({
signal: abort.signal,
timeoutInSeconds: 120,
...(lastEventId !== undefined ? { lastEventId } : {}),
}) as const;

const commonSubOptions = {
signal: abort.signal,
timeoutInSeconds: 120,
} as const;

// ---- Output stream: assistant messages ---------------------------------
//
// The output stream delivers UIMessageChunks interleaved with
// Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
// filter the control chunks and fold everything else into an assistant
// `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
// `readUIMessageStream` helper is only available in `ai@6`, and the
// webapp is pinned to `ai@4`, so we re-implement just the chunk types
// that `renderPart` actually displays.
// The output stream delivers data records (UIMessageChunks) interleaved
// with Trigger control records (`turn-complete`, `upgrade-required`) and
// S2 command records (`trim`). Control + command records ride on
// `record.headers` with empty bodies; the SSE parser strips S2 command
// records entirely, and control records arrive with `value.chunk ===
// undefined`, which `parseChunkPayload` drops below.
//
// We fold everything else into an assistant `UIMessage` via our own
// `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
// helper is only available in `ai@6`, and the webapp is pinned to
// `ai@4`, so we re-implement just the chunk types that `renderPart`
// actually displays.
//
// We capture the **server timestamp of each assistant message's first
// `start` chunk** so later sort-by-timestamp merges with the input
// stream correctly.
const runOutput = async () => {
try {
const sub = new SSEStreamSubscription(outputUrl, commonSubOptions);
// Seed messages from the snapshot first (if available), then
// resume the SSE from the snapshot's last event id so we don't
// re-stream chunks already represented in the snapshot. If no
// snapshot exists (no URL, 404, parse failure), the SSE opens
// at seq=0 — which, post-trim, contains roughly one turn of
// records (acceptable fallback for `hydrateMessages` sessions
// and fresh sessions).
const snapshotLastEventId = await loadSnapshot();
if (abort.signal.aborted) return;

const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId));
const raw = await sub.subscribe();
const reader = raw.getReader();

Expand All @@ -318,6 +401,12 @@ function useAgentSessionMessages({

const chunk = parseChunkPayload(value.chunk) as OutputChunk | null;
if (!chunk || typeof chunk.type !== "string") continue;
// Legacy belt-and-suspenders: prior versions of the SDK
// emitted `trigger:turn-complete` / `trigger:upgrade-required`
// as data records (`type` field). Current versions use
// header-form control records, which `parseChunkPayload`
// drops above. Keep this filter to handle any in-flight
// sessions whose `.out` was populated by the older SDK.
if (chunk.type.startsWith("trigger:")) continue;

if (chunk.type === "start") {
Expand Down Expand Up @@ -413,9 +502,18 @@ function useAgentSessionMessages({
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
if (!chunk || chunk.kind !== "message") continue;
const payload = chunk.payload;
if (!payload || !Array.isArray(payload.messages)) continue;

const incomingUsers = payload.messages.filter(
if (!payload) continue;

// Slim-wire is one UIMessage on `payload.message`; legacy
// payloads carried an array on `payload.messages`. Accept
// either so historical records on a long-lived session still
// render.
const candidates = Array.isArray(payload.messages)
? payload.messages
: payload.message
? [payload.message]
: [];
const incomingUsers = candidates.filter(
(m): m is UIMessage =>
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
);
Expand Down Expand Up @@ -454,7 +552,7 @@ function useAgentSessionMessages({
pendingTimerRef.current = null;
}
};
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]);

return useMemo(() => {
const timestamps = timestampsRef.current;
Expand Down
52 changes: 52 additions & 0 deletions apps/webapp/app/presenters/v3/SessionPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { type Span } from "@opentelemetry/api";
import { chatSnapshotKeySuffix } from "@trigger.dev/core/v3";
import { type PrismaClientOrTransaction } from "@trigger.dev/database";
import { env } from "~/env.server";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import { logger } from "~/services/logger.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { startActiveSpan } from "~/v3/tracer.server";

Expand All @@ -15,6 +18,8 @@ export class SessionPresenter {
userId: string;
environmentId: string;
sessionParam: string;
projectExternalRef: string;
environmentSlug: string;
}) {
return startActiveSpan(
"SessionPresenter.call",
Expand All @@ -33,10 +38,14 @@ export class SessionPresenter {
userId,
environmentId,
sessionParam,
projectExternalRef,
environmentSlug,
}: {
userId: string;
environmentId: string;
sessionParam: string;
projectExternalRef: string;
environmentSlug: string;
},
rootSpan: Span
) {
Expand Down Expand Up @@ -112,6 +121,48 @@ export class SessionPresenter {
// unused — kept here to match the existing `AgentViewAuth` shape.
const addressingKey = session.externalId ?? session.friendlyId;

// Presign a GET URL for the agent's S3 snapshot blob. The browser
// fetches it directly, parses + validates, and seeds the
// TriggerChatTransport with the full history + lastEventId before
// opening the SSE. Presign succeeds regardless of whether the blob
// exists; the frontend handles 404 gracefully.
//
// Snapshots are only written when no `hydrateMessages` hook is
// registered — sessions that use `hydrateMessages` will 404 here
// and the dashboard falls back to seq=0 SSE (which, post-trim,
// shows only the most recent turn — accepted, those customers
// have their own DB-backed dashboards).
// The agent writes snapshots keyed on the session's friendlyId (the
// `session_*` form), which matches what the SDK's `chat.agent` payload
// carries as `sessionId`. Use the same key shape here so the dashboard
// hits the same S3 object.
let snapshotPresignedUrl: string | undefined;
try {
const signed = await startActiveSpan(
"SessionPresenter.presignSnapshot",
async () =>
generatePresignedUrl(
projectExternalRef,
environmentSlug,
chatSnapshotKeySuffix(session.friendlyId),
"GET"
)
);
if (signed.success) {
snapshotPresignedUrl = signed.url;
} else {
logger.warn("SessionPresenter: snapshot presign failed", {
sessionId: session.id,
error: signed.error,
});
}
} catch (error) {
logger.warn("SessionPresenter: snapshot presign threw", {
sessionId: session.id,
error: error instanceof Error ? error.message : String(error),
});
}

return {
id: session.id,
friendlyId: session.friendlyId,
Expand Down Expand Up @@ -147,6 +198,7 @@ export class SessionPresenter {
apiOrigin: env.API_ORIGIN || env.LOGIN_ORIGIN,
sessionId: addressingKey,
initialMessages: [],
snapshotPresignedUrl,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
userId,
environmentId: environment.id,
sessionParam,
projectExternalRef: project.externalRef,
environmentSlug: environment.slug,
});

if (!session) {
Expand Down
Loading
Loading