Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .changeset/chat-slim-wire-merge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
"@trigger.dev/sdk": patch
---

Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together:

- The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow).
- `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [<state + resolution field>] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops.

Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern.

For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface:

```ts
import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";

chat.agent({
hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
const record = await db.chat.findUnique({ where: { id: chatId } });
const stored = record?.messages ?? [];
if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
}
return stored;
},
});
```

It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist.

Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input.
148 changes: 148 additions & 0 deletions packages/trigger-sdk/src/v3/ai-shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,151 @@ export type InferChatUIMessage<TTask extends AnyTask> = TTask extends Task<
>
? TUIM
: UIMessage;

/**
* Upsert an incoming wire message into the customer's DB-backed chain
* inside a `hydrateMessages` hook. Returns `true` iff the chain was
* mutated (the caller should persist).
*
* Handles the three cases that matter:
*
* - **Non-submit-message trigger** (`regenerate-message` / `action`,
* or `submit-message` with no incoming): no-op. Returns `false`.
* - **Incoming id already in `stored`** (HITL `addToolOutput` /
* `addToolApproveResponse` continuation — the wire carries the
* existing assistant's id with a slim resolution payload): no-op.
* The runtime's per-turn merge overlays the new tool-state advance
* onto the existing entry; pushing again would duplicate the row
* in the chain you return, and the duplicate slim copy would hit
* `toModelMessages` with no `input`. Returns `false`.
* - **Incoming id not in `stored`** (typically a fresh user message
* on a new turn): push. Returns `true`.
*
* Mutates `stored` in place. The caller persists `stored`, not the
* return value.
*
* @example
* ```ts
* import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";
*
* chat.agent({
* hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
* const record = await db.chat.findUnique({ where: { id: chatId } });
* const stored = record?.messages ?? [];
* if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
* await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
* }
* return stored;
* },
* });
* ```
*/
export function upsertIncomingMessage<TMsg extends UIMessage = UIMessage>(
stored: TMsg[],
event: {
trigger: "submit-message" | "regenerate-message" | "action";
incomingMessages: TMsg[];
}
): boolean {
if (event.trigger !== "submit-message") return false;
if (event.incomingMessages.length === 0) return false;
const newMsg = event.incomingMessages[event.incomingMessages.length - 1];
if (!newMsg) return false;
if (newMsg.id) {
const existingIdx = stored.findIndex((m) => m.id === newMsg.id);
if (existingIdx !== -1) return false;
}
stored.push(newMsg);
return true;
}

/**
* Tool-part states that the client advances and ships back over the wire.
* Covers HITL `addToolOutput` (output-available / output-error) and the
* approval flow (approval-responded / output-denied). `input-streaming` /
* `input-available` / `approval-requested` are server-emitted only — if
* we see them on the wire we treat them as no-ops and skip the slim/merge.
*/
function isWireAdvanceableToolState(
state: unknown
): state is "output-available" | "output-error" | "approval-responded" | "output-denied" {
return (
state === "output-available" ||
state === "output-error" ||
state === "approval-responded" ||
state === "output-denied"
);
}

/** Whether a tool-UI part is a static (`tool-${name}`) or dynamic tool. */
function isToolPartType(type: unknown): boolean {
return typeof type === "string" && (type.startsWith("tool-") || type === "dynamic-tool");
}

/**
* Slim an outgoing assistant message before it ships on `submit-message`.
*
* When the client calls `addToolOutput(...)` to resolve a HITL tool (or
* `addToolApproveResponse(...)` to approve/deny one), the AI SDK turns
* it into a `submit-message` whose `messages.at(-1)` is the existing
* assistant message with the new state stitched onto a single tool
* part. On a reasoning-heavy multi-step turn, that full assistant
* message can be 600 KB – 1 MB (encrypted reasoning blobs, reasoning
* text, full tool `input` JSON, prior tool outputs) — well over the
* `.in/append` cap.
*
* The agent runtime only consumes the wire-advanced fields of those
* tool parts (state + output / errorText / approval). Everything else
* (text, reasoning, tool `input`) is rebuilt server-side from the
* durable snapshot or `hydrateMessages`. So we drop everything but
* the advanced tool parts here, and reduce those to just the fields
* the server overlays.
*
* The slim only fires when the assistant message carries at least one
* wire-advanceable tool part. Plain assistant resends (no resolved /
* approval-responded tool) and non-assistant messages pass through
* untouched.
*
* Pairs with the per-turn merge on the agent side
* (`mergeIncomingIntoHydrated` in `ai.ts`).
*/
export function slimSubmitMessageForWire<TMsg extends UIMessage | undefined>(
message: TMsg
): TMsg {
if (!message) return message;
if (message.role !== "assistant") return message;
const parts = (message.parts ?? []) as any[];
const advancedToolParts = parts.filter(
(p) =>
p &&
typeof p === "object" &&
isToolPartType(p.type) &&
isWireAdvanceableToolState(p.state)
);
if (advancedToolParts.length === 0) return message;
const slimParts = advancedToolParts.map((p: any) => {
const base: Record<string, unknown> = {
type: p.type,
toolCallId: p.toolCallId,
state: p.state,
};
if (p.type === "dynamic-tool" && typeof p.toolName === "string") {
base.toolName = p.toolName;
}
if (p.state === "output-available") {
base.output = p.output;
if (p.approval !== undefined) base.approval = p.approval;
} else if (p.state === "output-error") {
if (p.errorText !== undefined) base.errorText = p.errorText;
if (p.approval !== undefined) base.approval = p.approval;
} else if (p.state === "approval-responded" || p.state === "output-denied") {
if (p.approval !== undefined) base.approval = p.approval;
}
return base;
});
return {
id: message.id,
role: message.role,
parts: slimParts,
} as unknown as TMsg;
}
Loading
Loading