From c4bc008c6caceaad137a37dd346e079123fdbd7f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 16 Jun 2026 12:25:15 +0100 Subject: [PATCH 1/2] feat(sdk): chat.headStart handover for customAgent and createSession chat.headStart previously handed the warm step-1 response over only to chat.agent. Now chat.customAgent (via conversation.consumeHandover) and chat.createSession (via turn.handover) consume it too, and a resumed tool round merges into the handed-over assistant instead of throwing or duplicating it. Also adds triggerConfig support to chat.headStart() and chat.openSession() so the handover-prepare run inherits tags, queue, and other session run options. Co-authored-by: saasjesus --- .changeset/chat-headstart-custom-backends.md | 26 ++ .changeset/chat-headstart-trigger-config.md | 15 + packages/trigger-sdk/src/v3/ai.ts | 253 ++++++++++- .../trigger-sdk/src/v3/chat-server.test.ts | 61 +++ packages/trigger-sdk/src/v3/chat-server.ts | 54 ++- .../src/v3/createStartSessionAction.test.ts | 36 ++ .../test/chatHandoverBackends.test.ts | 414 ++++++++++++++++++ 7 files changed, 825 insertions(+), 34 deletions(-) create mode 100644 .changeset/chat-headstart-custom-backends.md create mode 100644 .changeset/chat-headstart-trigger-config.md create mode 100644 packages/trigger-sdk/test/chatHandoverBackends.test.ts diff --git a/.changeset/chat-headstart-custom-backends.md b/.changeset/chat-headstart-custom-backends.md new file mode 100644 index 00000000000..d5969bd2d8f --- /dev/null +++ b/.changeset/chat-headstart-custom-backends.md @@ -0,0 +1,26 @@ +--- +"@trigger.dev/sdk": patch +--- + +`chat.headStart` now works with the `chat.customAgent` and `chat.createSession` backends, not only `chat.agent`. The warm step-1 response hands over to your loop the same way it does for a managed agent. + +In a `chat.customAgent` loop, consume the handover on turn 0: + +```ts +const conversation = new chat.MessageAccumulator(); +const { isFinal, skipped } = await conversation.consumeHandover({ payload }); +if (skipped) return; // warm handler aborted, so exit without a turn +if (isFinal) { + await chat.writeTurnComplete(); // step 1 is the response, no streamText +} else { + const result = streamText({ model, messages: conversation.modelMessages, tools }); + // Pass originalMessages so the handed-over tool round merges into the + // step-1 assistant instead of starting a new message. + const response = await chat.pipeAndCapture(result, { + originalMessages: conversation.uiMessages, + }); + if (response) await conversation.addResponse(response); +} +``` + +With `chat.createSession`, the iterator surfaces it as `turn.handover`; call `turn.complete()` with no argument on a final handover. The lower-level `chat.waitForHandover()` and `accumulator.applyHandover()` are also exported for hand-rolled loops. diff --git a/.changeset/chat-headstart-trigger-config.md b/.changeset/chat-headstart-trigger-config.md new file mode 100644 index 00000000000..d629d2fe8a9 --- /dev/null +++ b/.changeset/chat-headstart-trigger-config.md @@ -0,0 +1,15 @@ +--- +"@trigger.dev/sdk": patch +--- + +Add `triggerConfig` support to `chat.headStart()` and `chat.openSession()`, so the auto-triggered handover-prepare run inherits tags, queue, machine, and other session trigger options the same way `chat.createStartSessionAction()` does. The `chat:{chatId}` tag is prepended automatically. + +```ts +export const POST = chat.headStart({ + agentId: "my-agent", + triggerConfig: { tags: ["org:acme"], queue: "chat" }, + run: async ({ chat }) => streamText({ ...chat.toStreamTextOptions(), model }), +}); +``` + +Because the session is created once on the first head-start turn and is idempotent on the chat id, this is the only place to set those options for a head-start chat's lifetime. `chat.createStartSessionAction()` now also forwards `maxDuration`, `region`, and `lockToVersion` so both session entry points stay consistent. diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 6d4368f8141..9afb63e2180 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -1761,9 +1761,10 @@ const stopInput: RealtimeDefinedInputStream<{ stop: true; message?: string }> = * with pending tool calls (`"handover"` — agent picks up from tool * execution), or it finished pure-text (`"handover-skip"` — agent * exits cleanly without making an LLM call). - * @internal + * + * Returned by `chat.waitForHandover()` for custom-agent loops. */ -type HandoverSignal = +export type HandoverSignal = | { kind: "handover"; partialAssistantMessage: ModelMessage[]; @@ -1808,6 +1809,42 @@ const handoverInput = { }, }; +/** + * Wait for a `chat.headStart` handover signal inside a custom-agent loop or + * `chat.createSession`. Returns: + * - `null` — this run is not a `handover-prepare` boot, or the wait idled out / + * the warm handler crashed before signaling. Treat as "no handover". + * - `{ kind: "handover-skip" }` — the warm handler aborted; exit without a turn. + * - `{ kind: "handover", partialAssistantMessage, messageId?, isFinal }` — splice + * the partial (`chat.MessageAccumulator.applyHandover`) and, when `isFinal` is + * false, fall through to `streamText` to run the handed-over tool round. + * + * For the common case prefer `accumulator.consumeHandover()`, which also seeds + * `payload.headStartMessages` and applies the partial for you. + * + * Must be called at turn 0 before any `chat.messages.waitWithIdleTimeout` — + * that facade consumes and discards non-message chunks, which would swallow the + * handover signal. + */ +async function waitForHandover(options: { + /** The run's wire payload (only `trigger` / `idleTimeoutInSeconds` are read). */ + payload: { trigger?: string; idleTimeoutInSeconds?: number }; + idleTimeoutInSeconds?: number; + timeout?: string; + spanName?: string; +}): Promise { + if (options.payload.trigger !== "handover-prepare") return null; + const result = await handoverInput.waitWithIdleTimeout({ + idleTimeoutInSeconds: + options.idleTimeoutInSeconds ?? options.payload.idleTimeoutInSeconds ?? 60, + timeout: options.timeout, + spanName: options.spanName ?? "waiting for handover signal", + }); + // Non-ok = idle timeout or the warm handler crashed without signaling. + if (!result.ok) return null; + return result.output; +} + /** * Per-turn deferred promises. Registered via `chat.defer()`, awaited * before `onTurnComplete` fires. Reset each turn. @@ -1918,6 +1955,31 @@ function synthesizeHandoverUIMessage( } as UIMessage; } +/** + * Splice a head-start handover partial into an accumulating message pair + * (model + UI). Dedups by `messageId` against the UI chain (so a hydrated + * history that already persisted the partial isn't doubled), then pushes the + * partial into `modelMessages` and the synthesized UIMessage into `uiMessages`. + * Shared by the `chat.agent` turn-0 splice and `ChatMessageAccumulator.applyHandover`. + * @internal + */ +function spliceHandoverPartial( + modelMessages: ModelMessage[], + uiMessages: UIMessage[], + signal: { partialAssistantMessage: ModelMessage[]; messageId?: string } +): void { + if (!signal.partialAssistantMessage || signal.partialAssistantMessage.length === 0) { + return; + } + // Skip if the hydrated chain already persisted the partial under this id. + const alreadyInChain = + signal.messageId !== undefined && uiMessages.some((m) => m.id === signal.messageId); + if (alreadyInChain) return; + modelMessages.push(...signal.partialAssistantMessage); + const partialUI = synthesizeHandoverUIMessage(signal.partialAssistantMessage, signal.messageId); + if (partialUI) uiMessages.push(partialUI); +} + /** * Per-turn background context queue. Messages added via `chat.backgroundWork.inject()` * are drained at the next `prepareStep` boundary and appended to the model messages. @@ -6798,22 +6860,10 @@ function chatAgent< // `UIMessageStreamError: No tool invocation found`. const pendingHandoverPartial = locals.get(chatHandoverPartialKey); if (pendingHandoverPartial && pendingHandoverPartial.length > 0) { - const handoverMessageId = locals.get(chatHandoverMessageIdKey); - // Skip if the hydrated chain already persisted the - // partial under the handover messageId. - const alreadyInChain = - handoverMessageId !== undefined && - accumulatedUIMessages.some((m) => m.id === handoverMessageId); - if (!alreadyInChain) { - accumulatedMessages.push(...pendingHandoverPartial); - const partialUI = synthesizeHandoverUIMessage( - pendingHandoverPartial, - handoverMessageId - ); - if (partialUI) { - accumulatedUIMessages.push(partialUI as TUIMessage); - } - } + spliceHandoverPartial(accumulatedMessages, accumulatedUIMessages, { + partialAssistantMessage: pendingHandoverPartial, + messageId: locals.get(chatHandoverMessageIdKey), + }); locals.set(chatHandoverPartialKey, []); // consume once } } @@ -8748,7 +8798,7 @@ async function chatWriteTurnComplete(options?: { publicAccessToken?: string }): */ async function pipeChatAndCapture( source: UIMessageStreamable, - options?: { signal?: AbortSignal; spanName?: string } + options?: { signal?: AbortSignal; spanName?: string; originalMessages?: UIMessage[] } ): Promise { let captured: UIMessage | undefined; let resolveOnFinish: () => void; @@ -8759,6 +8809,10 @@ async function pipeChatAndCapture( const resolvedOptions = resolveUIMessageStreamOptions(); const uiStream = source.toUIMessageStream({ ...resolvedOptions, + // Thread the prior chain (incl. a spliced handover partial) so a resumed + // tool round's tool-output chunks merge into the originating tool-call + // instead of throwing "No tool invocation found". + ...(options?.originalMessages ? { originalMessages: options.originalMessages } : {}), // Stamp a server-generated id on the start chunk, same as chat.agent's // pipe. Without it the AI SDK regenerates the assistant id when a // prepareStep injection (steering) starts a new step mid-stream, and @@ -8844,10 +8898,76 @@ class ChatMessageAccumulator { this.modelMessages = await toModelMessages(uiMessages); } + /** + * Splice a `chat.headStart` handover partial into the accumulator (the warm + * step-1 response). Dedups by `messageId` so a seeded/hydrated history that + * already carries the partial isn't doubled. Seed any prior history first + * (e.g. `setMessages(payload.headStartMessages)`). Low-level — see + * `consumeHandover` for the wait+seed+apply convenience. + */ + applyHandover(signal: { partialAssistantMessage: ModelMessage[]; messageId?: string }): void { + spliceHandoverPartial(this.modelMessages, this.uiMessages, signal); + } + + /** + * One-call `chat.headStart` handover for a custom-agent loop: waits for the + * handover signal, seeds prior history from `payload.headStartMessages`, + * applies the warm step-1 partial, and reports what to do next. + * + * Returns `{ isFinal, skipped }`: + * - `skipped: true` — not a `handover-prepare` run, the wait idled out, or the + * warm handler aborted. Exit the run without a turn. + * - `isFinal: true` — step 1 IS the response (pure text). Write turn-complete + * and continue; do not call `streamText`. + * - `isFinal: false` — fall through to `streamText`, which runs the pending + * tool round handed over from step 1. + */ + async consumeHandover(options: { + payload: { + trigger?: string; + idleTimeoutInSeconds?: number; + headStartMessages?: UIMessage[]; + }; + idleTimeoutInSeconds?: number; + timeout?: string; + }): Promise<{ isFinal: boolean; skipped: boolean }> { + const signal = await waitForHandover({ + payload: options.payload, + idleTimeoutInSeconds: options.idleTimeoutInSeconds, + timeout: options.timeout, + }); + if (!signal || signal.kind === "handover-skip") { + return { isFinal: false, skipped: true }; + } + if (options.payload.headStartMessages && options.payload.headStartMessages.length > 0) { + await this.setMessages(options.payload.headStartMessages); + } + this.applyHandover(signal); + return { isFinal: signal.isFinal, skipped: false }; + } + async addResponse(response: UIMessage): Promise { if (!response.id) { response = { ...response, id: generateMessageId() }; } + // Tool-approval and handover-resume continuations reuse the trailing + // assistant's ID (via originalMessages on the pipe), so the captured + // response can carry the same ID as a message already in the chain + // (e.g. a spliced handover partial). Replace in place instead of pushing + // a duplicate, mirroring the chat.agent accumulator. + const existingIdx = this.uiMessages.findIndex((m) => m.id === response.id); + if (existingIdx !== -1) { + this.uiMessages[existingIdx] = response; + try { + // Reconvert all model messages since we replaced rather than appended. + this.modelMessages = await toModelMessages( + this.uiMessages.map((m) => stripProviderMetadata(m)) + ); + } catch { + // Conversion failed — leave the existing model messages in place + } + return; + } this.uiMessages.push(response); try { const msgs = await toModelMessages([stripProviderMetadata(response)]); @@ -9040,6 +9160,13 @@ export type ChatTurn = { previousTurnUsage?: LanguageModelUsage; /** Cumulative token usage across all completed turns so far. */ totalUsage: LanguageModelUsage; + /** + * Set on the first turn of a `chat.headStart` handover; `null` otherwise. + * When `isFinal` is true the warm step-1 IS the response — call + * `turn.complete()` with no argument (don't call `streamText`). When false, + * call `streamText` as usual; it runs the handed-over tool round. + */ + handover: { isFinal: boolean } | null; /** * Replace accumulated messages (for compaction). Takes UIMessages and @@ -9051,8 +9178,11 @@ export type ChatTurn = { /** * Easy path: pipe stream, capture response, accumulate it, * clean up aborted parts if stopped, and write turn-complete chunk. + * + * Call with no argument on a head-start final turn (`turn.handover?.isFinal`) + * — the warm step-1 partial is already the response, so there's nothing to pipe. */ - complete(source: UIMessageStreamable): Promise; + complete(source?: UIMessageStreamable): Promise; /** * Manual path: just write turn-complete chunk. @@ -9152,6 +9282,31 @@ function createChatSession( } turn++; + // Head-start handover: the server triggered this run with + // `trigger: "handover-prepare"` and signals the warm step-1 partial on + // `session.in`. Wait for it BEFORE any `messagesInput.waitWithIdleTimeout` + // (that facade consumes-and-discards non-message chunks and would swallow + // the signal). Turn-0 only — continuation boots never carry this trigger. + let handoverThisTurn: { isFinal: boolean } | null = null; + let pendingHandoverSignal: HandoverSignal | null = null; + if (turn === 0 && currentPayload.trigger === "handover-prepare") { + const signal = await waitForHandover({ + payload: currentPayload, + idleTimeoutInSeconds: + sessionIdleTimeoutOpt ?? currentPayload.idleTimeoutInSeconds ?? idleTimeoutInSeconds, + timeout, + }); + if (!signal || signal.kind === "handover-skip" || runSignal.aborted) { + stop.cleanup(); + return { done: true, value: undefined }; + } + pendingHandoverSignal = signal; + handoverThisTurn = { isFinal: signal.isFinal }; + // Rewrite to a normal first-turn message turn so the rest of the loop + // (steering setup, addIncoming, turnObj) runs unchanged. + currentPayload = { ...currentPayload, trigger: "submit-message", message: undefined }; + } + // First turn: wait when the boot payload carries no message. // Preload boots wait for the first real message; continuation // boots (fresh run via `ensureRunForSession` / end-and-continue) @@ -9274,6 +9429,17 @@ function createChatSession( turn ); + // Apply the head-start handover AFTER addIncoming — turn-0 addIncoming + // replaces accumulator state, which would wipe a pre-applied splice. + // Seed prior history first, then splice the warm step-1 partial. + if (pendingHandoverSignal) { + const priorHistory = currentPayload.headStartMessages as UIMessage[] | undefined; + if (priorHistory && priorHistory.length > 0) { + await accumulator.setMessages(priorHistory); + } + accumulator.applyHandover(pendingHandoverSignal); + } + // chat.requestUpgrade() called before this turn — signal transport and exit if (locals.get(chatUpgradeRequestedKey)) { await writeUpgradeRequiredChunk(); @@ -9302,15 +9468,31 @@ function createChatSession( continuation: currentPayload.continuation ?? false, previousTurnUsage, totalUsage: cumulativeUsage, + handover: handoverThisTurn, async setMessages(uiMessages: UIMessage[]) { await accumulator.setMessages(uiMessages); }, - async complete(source: UIMessageStreamable) { + async complete(source?: UIMessageStreamable) { + // Head-start final turn: the warm step-1 partial is already spliced + // into the accumulator and IS the response — nothing to pipe. + if (!source) { + sessionMsgSub.off(); + await chatWriteTurnComplete(); + return accumulator.uiMessages.at(-1); + } let response: UIMessage | undefined; try { - response = await pipeChatAndCapture(source, { signal: combinedSignal }); + response = await pipeChatAndCapture(source, { + signal: combinedSignal, + // On a non-final handover turn, thread the spliced partial so a + // resumed tool round's tool-output chunks merge into the + // handed-over tool-call. Gated on the handover turn only — a + // normal turn must not pass originalMessages (it would merge the + // fresh response into the prior assistant message). + ...(handoverThisTurn ? { originalMessages: accumulator.uiMessages } : {}), + }); } catch (error) { if (error instanceof Error && error.name === "AbortError") { if (runSignal.aborted) { @@ -9899,8 +10081,8 @@ function createChatStartSessionAction( // run-list filter by chat works without the customer having to wire it // up. Mirrors the browser-mediated `TriggerChatTransport.doStart` path. const userTags = params.triggerConfig?.tags ?? options?.triggerConfig?.tags ?? []; - // Platform cap is 10 tags per run; the auto chat tag takes one slot. - const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 10); + // SessionTriggerConfig.tags allows at most 5; the auto chat tag takes one slot. + const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 5); const clientDataMetadata = params.clientData !== undefined ? { metadata: params.clientData } : {}; @@ -9928,6 +10110,22 @@ function createChatStartSessionAction( params.triggerConfig?.maxAttempts ?? options?.triggerConfig?.maxAttempts!, } : {}), + ...(options?.triggerConfig?.maxDuration !== undefined || + params.triggerConfig?.maxDuration !== undefined + ? { + maxDuration: + params.triggerConfig?.maxDuration ?? options?.triggerConfig?.maxDuration!, + } + : {}), + ...(options?.triggerConfig?.region || params.triggerConfig?.region + ? { region: params.triggerConfig?.region ?? options?.triggerConfig?.region } + : {}), + ...(options?.triggerConfig?.lockToVersion || params.triggerConfig?.lockToVersion + ? { + lockToVersion: + params.triggerConfig?.lockToVersion ?? options?.triggerConfig?.lockToVersion, + } + : {}), ...(options?.triggerConfig?.idleTimeoutInSeconds !== undefined || params.triggerConfig?.idleTimeoutInSeconds !== undefined ? { @@ -10137,6 +10335,13 @@ export const chat = { MessageAccumulator: ChatMessageAccumulator, /** Create a chat session (async iterator). See {@link createChatSession}. */ createSession: createChatSession, + /** + * Wait for a `chat.headStart` handover signal inside a `chat.customAgent` + * loop (turn 0). See {@link waitForHandover}. For most loops prefer the + * `chat.MessageAccumulator.consumeHandover()` convenience, which also seeds + * `payload.headStartMessages` and applies the partial. + */ + waitForHandover, /** * Store and retrieve a resolved prompt for the current run. * diff --git a/packages/trigger-sdk/src/v3/chat-server.test.ts b/packages/trigger-sdk/src/v3/chat-server.test.ts index dc9ef11788f..b8def196999 100644 --- a/packages/trigger-sdk/src/v3/chat-server.test.ts +++ b/packages/trigger-sdk/src/v3/chat-server.test.ts @@ -216,6 +216,67 @@ describe("chat.headStart (route handler)", () => { expect(body.triggerConfig.basePayload.idleTimeoutInSeconds).toBe(60); }); + it("merges triggerConfig tags and queue into createSession", async () => { + const requests: CapturedRequest[] = []; + global.fetch = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.toString(); + requests.push({ url: urlStr, init }); + if (urlStr.endsWith("/api/v1/sessions") || urlStr.endsWith("/api/v1/sessions/")) { + return createSessionResponse("chat-1"); + } + if (urlStr.includes("/realtime/v1/sessions/") && urlStr.endsWith("/in/append")) { + return appendOkResponse(); + } + if (/\/realtime\/v1\/sessions\/[^/]+\/out$/.test(urlStr)) { + return new Response(new ReadableStream({ start(c) { c.close(); } }), { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + } + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const handler = chat.headStart({ + agentId: "test-agent", + triggerConfig: { + tags: ["org:acme", "agentic-run:xyz"], + queue: "my-queue", + }, + run: async ({ chat: chatHelper }) => { + return streamText({ + ...chatHelper.toStreamTextOptions(), + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("hi back") }), + }), + }); + }, + }); + + await withApiContext(() => + handler( + makeRequest({ + chatId: "chat-1", + trigger: "submit-message", + headStartMessages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] }], + }) + ) + ); + + const sessionCreate = requests.find((r) => + r.url.endsWith("/api/v1/sessions") || r.url.endsWith("/api/v1/sessions/") + ); + expect(sessionCreate).toBeDefined(); + const body = JSON.parse(sessionCreate!.init!.body as string); + expect(body.triggerConfig.tags).toEqual([ + "chat:chat-1", + "org:acme", + "agentic-run:xyz", + ]); + expect(body.triggerConfig.queue).toBe("my-queue"); + expect(body.triggerConfig.basePayload.trigger).toBe("handover-prepare"); + expect(body.triggerConfig.basePayload.chatId).toBe("chat-1"); + }); + it("dispatches handover with isFinal=true on pure-text finishReason", async () => { const requests: CapturedRequest[] = []; global.fetch = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => { diff --git a/packages/trigger-sdk/src/v3/chat-server.ts b/packages/trigger-sdk/src/v3/chat-server.ts index 95e4fb529cd..7436cf91800 100644 --- a/packages/trigger-sdk/src/v3/chat-server.ts +++ b/packages/trigger-sdk/src/v3/chat-server.ts @@ -59,6 +59,7 @@ import { SessionStreamInstance, TRIGGER_CONTROL_SUBTYPE, apiClientManager, + type SessionTriggerConfig, } from "@trigger.dev/core/v3"; // Runtime VALUES via the ESM/CJS shim so the CJS build can `require` ESM-only // `ai@7` (see ../imports/ai-runtime.ts). @@ -195,6 +196,12 @@ export type HeadStartHandlerOptions> = { * exiting. Defaults to 60. */ idleTimeoutInSeconds?: number; + /** + * Run options for the auto-triggered `handover-prepare` session run — + * tags, queue, machine, etc. Mirrors `chat.createStartSessionAction`. + * The `chat:{chatId}` tag is prepended automatically. + */ + triggerConfig?: Partial; }; // --------------------------------------------------------------------------- @@ -220,6 +227,7 @@ export const chat = { req, agentId: opts.agentId, idleTimeoutInSeconds: opts.idleTimeoutInSeconds, + triggerConfig: opts.triggerConfig, }); const helper: HeadStartChatHelper = { @@ -249,6 +257,7 @@ export const chat = { req: Request; agentId: string; idleTimeoutInSeconds?: number; + triggerConfig?: Partial; }): Promise { return openHandoverSession(opts).then((s) => s.handle); }, @@ -304,6 +313,7 @@ async function openHandoverSession(opts: { req: Request; agentId: string; idleTimeoutInSeconds?: number; + triggerConfig?: Partial; }): Promise { const wirePayload = (await opts.req.json()) as ChatTaskWirePayload; const chatId = wirePayload.chatId; @@ -323,7 +333,39 @@ async function openHandoverSession(opts: { const modelMessages = await convertToModelMessages(uiMessages); const apiClient = resolveApiClient(); - const idleTimeoutInSeconds = opts.idleTimeoutInSeconds ?? 60; + // Top-level `idleTimeoutInSeconds` wins over the one in `triggerConfig`. + const idleTimeoutInSeconds = + opts.idleTimeoutInSeconds ?? opts.triggerConfig?.idleTimeoutInSeconds ?? 60; + + // Merge the customer's trigger options. `handover-prepare` and `chatId` in + // `basePayload` are ours and can't be overridden; the `chat:{chatId}` tag is + // prepended (SessionTriggerConfig.tags caps at 5). + const userTags = opts.triggerConfig?.tags ?? []; + const tags = [`chat:${chatId}`, ...userTags].slice(0, 5); + + const triggerConfig: SessionTriggerConfig = { + basePayload: { + ...(opts.triggerConfig?.basePayload ?? {}), + ...wirePayload, + chatId, + trigger: "handover-prepare", + idleTimeoutInSeconds, + }, + ...(opts.triggerConfig?.machine ? { machine: opts.triggerConfig.machine } : {}), + ...(opts.triggerConfig?.queue ? { queue: opts.triggerConfig.queue } : {}), + tags, + ...(opts.triggerConfig?.maxAttempts !== undefined + ? { maxAttempts: opts.triggerConfig.maxAttempts } + : {}), + ...(opts.triggerConfig?.maxDuration !== undefined + ? { maxDuration: opts.triggerConfig.maxDuration } + : {}), + ...(opts.triggerConfig?.region ? { region: opts.triggerConfig.region } : {}), + ...(opts.triggerConfig?.lockToVersion + ? { lockToVersion: opts.triggerConfig.lockToVersion } + : {}), + idleTimeoutInSeconds, + }; // Create the session and trigger the chat.agent's `handover-prepare` // run atomically. `createSession` is idempotent on `(env, externalId @@ -342,15 +384,7 @@ async function openHandoverSession(opts: { type: "chat.agent", externalId: chatId, taskIdentifier: opts.agentId, - triggerConfig: { - basePayload: { - ...wirePayload, - chatId, - trigger: "handover-prepare", - idleTimeoutInSeconds, - }, - idleTimeoutInSeconds, - }, + triggerConfig, }); const sessionPublicAccessToken = created.publicAccessToken; diff --git a/packages/trigger-sdk/src/v3/createStartSessionAction.test.ts b/packages/trigger-sdk/src/v3/createStartSessionAction.test.ts index 2b3214b77d1..d1a6a9b61d2 100644 --- a/packages/trigger-sdk/src/v3/createStartSessionAction.test.ts +++ b/packages/trigger-sdk/src/v3/createStartSessionAction.test.ts @@ -96,6 +96,42 @@ describe("chat.createStartSessionAction — runtime", () => { expect(lastStartBody?.triggerConfig.basePayload).not.toHaveProperty("metadata"); }); + it("prepends chat:{chatId} to triggerConfig.tags and caps at 5", async () => { + installStartFixture(); + + const start = chat.createStartSessionAction("fake-chat", { + triggerConfig: { + tags: ["org:acme", "a", "b", "c", "d", "e"], + }, + }); + await start({ chatId: "chat-tags" }); + + expect(lastStartBody?.triggerConfig.tags).toEqual([ + "chat:chat-tags", + "org:acme", + "a", + "b", + "c", + ]); + }); + + it("forwards maxDuration, region, and lockToVersion from triggerConfig", async () => { + installStartFixture(); + + const start = chat.createStartSessionAction("fake-chat", { + triggerConfig: { + maxDuration: 120, + region: "us-east-1", + lockToVersion: "20260101.1", + }, + }); + await start({ chatId: "chat-parity" }); + + expect(lastStartBody?.triggerConfig.maxDuration).toBe(120); + expect(lastStartBody?.triggerConfig.region).toBe("us-east-1"); + expect(lastStartBody?.triggerConfig.lockToVersion).toBe("20260101.1"); + }); + it("keeps session-level metadata distinct from per-turn clientData", async () => { installStartFixture(); diff --git a/packages/trigger-sdk/test/chatHandoverBackends.test.ts b/packages/trigger-sdk/test/chatHandoverBackends.test.ts new file mode 100644 index 00000000000..29b45172e9e --- /dev/null +++ b/packages/trigger-sdk/test/chatHandoverBackends.test.ts @@ -0,0 +1,414 @@ +// Import the test harness FIRST — installs the resource catalog so the +// chat task functions below register correctly. +import { mockChatAgent } from "../src/v3/test/index.js"; + +import { describe, expect, it, vi } from "vitest"; +import { chat } from "../src/v3/ai.js"; +import { simulateReadableStream, streamText, tool } from "ai"; +import { MockLanguageModelV3 } from "ai/test"; +import type { LanguageModelV3StreamPart } from "@ai-sdk/provider"; +import type { ModelMessage, UIMessage } from "ai"; +import { z } from "zod"; + +// ── Helpers ──────────────────────────────────────────────────────────── + +function textStream(text: string): ReadableStream { + return simulateReadableStream({ + chunks: [ + { type: "text-start", id: "t1" }, + { type: "text-delta", id: "t1", delta: text }, + { type: "text-end", id: "t1" }, + { + type: "finish", + finishReason: { unified: "stop", raw: "stop" }, + usage: { + inputTokens: { total: 10, noCache: 10, cacheRead: undefined, cacheWrite: undefined }, + outputTokens: { total: 10, text: 10, reasoning: undefined }, + }, + }, + ], + }); +} + +type Capture = { + skipped?: boolean; + isFinal?: boolean; + handover?: { isFinal: boolean } | null; + uiMessages?: Array<{ id: string; role: string; partTypes: string[] }>; +}; + +function snapshot(uiMessages: UIMessage[]): Capture["uiMessages"] { + return uiMessages.map((m) => ({ + id: m.id, + role: m.role, + partTypes: (m.parts ?? []).map((p) => p.type), + })); +} + +// A pure-text partial (the warm step-1 response, isFinal: true). +const PURE_TEXT_PARTIAL: ModelMessage[] = [ + { role: "assistant", content: [{ type: "text", text: "Hi there, hope you're well." }] }, +]; + +// A tool-call partial reshaped server-side into the approval round (isFinal: false). +const TOOL_CALL_PARTIAL: ModelMessage[] = [ + { + role: "assistant", + content: [ + { type: "text", text: "let me check the weather" }, + { type: "tool-call", toolCallId: "tc-1", toolName: "weather", input: { city: "tokyo" } }, + { type: "tool-approval-request", approvalId: "handover-approval-1", toolCallId: "tc-1" } as never, + ], + }, + { + role: "tool", + content: [ + { type: "tool-approval-response", approvalId: "handover-approval-1", approved: true } as never, + ], + }, +]; + +function weatherToolWithExecute(execute: (input: { city: string }) => Promise) { + return tool({ + description: "Look up weather", + inputSchema: z.object({ city: z.string() }), + execute: execute as never, + }); +} + +// ── chat.customAgent + headStart handover ─────────────────────────────── + +describe("chat.customAgent + headStart handover", () => { + it("consumeHandover skip → clean exit, no turn-complete", async () => { + const capture: Capture = {}; + const runAfter = vi.fn(); + + const agent = chat.customAgent({ + id: "custom.handover.skip", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + const { isFinal, skipped } = await conversation.consumeHandover({ payload }); + capture.skipped = skipped; + capture.isFinal = isFinal; + if (skipped) return; + runAfter(); + await chat.writeTurnComplete(); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "t-skip", mode: "handover-prepare" }); + try { + await harness.sendHandoverSkip(); + await new Promise((r) => setTimeout(r, 20)); + expect(capture.skipped).toBe(true); + expect(capture.isFinal).toBe(false); + expect(runAfter).not.toHaveBeenCalled(); + expect(harness.allChunks).toHaveLength(0); + } finally { + await harness.close(); + } + }); + + it("consumeHandover isFinal: true (pure text) → partial spliced, no streamText", async () => { + const capture: Capture = {}; + const runAfter = vi.fn(); + + const agent = chat.customAgent({ + id: "custom.handover.final", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + const { isFinal, skipped } = await conversation.consumeHandover({ payload }); + capture.skipped = skipped; + capture.isFinal = isFinal; + capture.uiMessages = snapshot(conversation.uiMessages); + if (skipped) return; + if (isFinal) { + await chat.writeTurnComplete(); + return; + } + runAfter(); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "t-final", mode: "handover-prepare" }); + try { + await harness.sendHandover({ + partialAssistantMessage: PURE_TEXT_PARTIAL, + messageId: "asst-msg-1", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 20)); + expect(capture.skipped).toBe(false); + expect(capture.isFinal).toBe(true); + // The warm step-1 partial is in the accumulator under its messageId. + expect(capture.uiMessages).toEqual([ + { id: "asst-msg-1", role: "assistant", partTypes: ["text"] }, + ]); + // isFinal means no streamText. + expect(runAfter).not.toHaveBeenCalled(); + } finally { + await harness.close(); + } + }); + + it("consumeHandover isFinal: false (tool call) → streamText runs the handed-over tool round", async () => { + const capture: Capture = {}; + const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 })); + + const agent = chat.customAgent({ + id: "custom.handover.toolcall", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + const { isFinal, skipped } = await conversation.consumeHandover({ payload }); + capture.skipped = skipped; + capture.isFinal = isFinal; + if (skipped) return; + if (isFinal) { + await chat.writeTurnComplete(); + return; + } + const result = streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }), + }), + messages: conversation.modelMessages, + tools: { weather: weatherToolWithExecute(toolExecute) }, + }); + await chat.pipeAndCapture(result); + await chat.writeTurnComplete(); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "t-tool", mode: "handover-prepare" }); + try { + await harness.sendHandover({ + partialAssistantMessage: TOOL_CALL_PARTIAL, + messageId: "asst-msg-2", + isFinal: false, + }); + // Handover consumed as non-final, and the handed-over tool round resumed: + // the agent-side `execute` ran on the pending tool-call from step 1 + // (schema-only-on-warm-handler pattern). The full step-2-text-through-handover + // path is verified end-to-end by the ai-chat-e2e smoke test (T29). + expect(capture.isFinal).toBe(false); + expect(toolExecute).toHaveBeenCalledWith( + expect.objectContaining({ city: "tokyo" }), + expect.anything() + ); + } finally { + await harness.close(); + } + }); + + it("addResponse replaces the spliced partial in place when the resume reuses its id", async () => { + // On a non-final handover resume the pipe threads originalMessages, so the + // captured response carries the SAME id as the spliced partial. addResponse + // must replace it, not append a duplicate (else the persisted accumulator + // ends up with two assistant messages — caught live by T29, not the mock pipe). + const capture: Capture = {}; + + const agent = chat.customAgent({ + id: "custom.handover.addresponse-dedup", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + await conversation.consumeHandover({ payload }); + // Simulate the merged step-2 response reusing the partial's id. + await conversation.addResponse({ + id: "asst-msg-2", + role: "assistant", + parts: [ + { type: "text", text: "the weather in tokyo is 22°C" }, + { + type: "tool-weather", + toolCallId: "tc-1", + state: "output-available", + input: { city: "tokyo" }, + output: { city: "tokyo", temp: 22 }, + } as never, + ], + }); + capture.uiMessages = snapshot(conversation.uiMessages); + await chat.writeTurnComplete(); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "t-addresp-dedup", mode: "handover-prepare" }); + try { + await harness.sendHandover({ + partialAssistantMessage: TOOL_CALL_PARTIAL, + messageId: "asst-msg-2", + isFinal: false, + }); + await new Promise((r) => setTimeout(r, 20)); + // Exactly one assistant message under the handover id — replaced, not doubled. + expect(capture.uiMessages!.filter((m) => m.id === "asst-msg-2")).toHaveLength(1); + expect(capture.uiMessages!.filter((m) => m.role === "assistant")).toHaveLength(1); + // And it carries the merged step-2 content (text + resolved tool output). + expect(capture.uiMessages!.at(-1)?.partTypes).toEqual(["text", "tool-weather"]); + } finally { + await harness.close(); + } + }); + + it("seeds payload.headStartMessages before splicing the partial", async () => { + const capture: Capture = {}; + const prior: UIMessage[] = [ + { id: "u-1", role: "user", parts: [{ type: "text", text: "hello" }] }, + ]; + + const agent = chat.customAgent({ + id: "custom.handover.seed", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + await conversation.consumeHandover({ payload }); + capture.uiMessages = snapshot(conversation.uiMessages); + await chat.writeTurnComplete(); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "t-seed", + mode: "handover-prepare", + headStartMessages: prior, + }); + try { + await harness.sendHandover({ + partialAssistantMessage: PURE_TEXT_PARTIAL, + messageId: "asst-msg-3", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 20)); + // Prior history first, then the warm partial. + expect(capture.uiMessages).toEqual([ + { id: "u-1", role: "user", partTypes: ["text"] }, + { id: "asst-msg-3", role: "assistant", partTypes: ["text"] }, + ]); + } finally { + await harness.close(); + } + }); + + it("dedups the partial when headStartMessages already carries its messageId", async () => { + const capture: Capture = {}; + const prior: UIMessage[] = [ + { id: "u-1", role: "user", parts: [{ type: "text", text: "hello" }] }, + // Already-persisted partial under the same id the handover uses. + { id: "asst-dup", role: "assistant", parts: [{ type: "text", text: "Hi there, hope you're well." }] }, + ]; + + const agent = chat.customAgent({ + id: "custom.handover.dedup", + run: async (payload) => { + const conversation = new chat.MessageAccumulator(); + await conversation.consumeHandover({ payload }); + capture.uiMessages = snapshot(conversation.uiMessages); + await chat.writeTurnComplete(); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "t-dedup", + mode: "handover-prepare", + headStartMessages: prior, + }); + try { + await harness.sendHandover({ + partialAssistantMessage: PURE_TEXT_PARTIAL, + messageId: "asst-dup", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 20)); + // Not doubled — still just the two seeded messages. + expect(capture.uiMessages).toHaveLength(2); + expect(capture.uiMessages!.filter((m) => m.id === "asst-dup")).toHaveLength(1); + } finally { + await harness.close(); + } + }); +}); + +// ── chat.createSession + headStart handover ────────────────────────────── + +describe("chat.createSession + headStart handover", () => { + it("turn.handover.isFinal: true → complete() with no source finalizes the partial", async () => { + const capture: Capture = {}; + const runAfter = vi.fn(); + + const agent = chat.customAgent({ + id: "session.handover.final", + run: async (payload) => { + const session = chat.createSession(payload, { signal: new AbortController().signal }); + for await (const turn of session) { + capture.handover = turn.handover; + capture.uiMessages = snapshot(turn.uiMessages); + if (turn.handover?.isFinal) { + await turn.complete(); + return; + } + runAfter(); + return; + } + }, + }); + + const harness = mockChatAgent(agent, { chatId: "s-final", mode: "handover-prepare" }); + try { + await harness.sendHandover({ + partialAssistantMessage: PURE_TEXT_PARTIAL, + messageId: "asst-msg-4", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 20)); + expect(capture.handover).toEqual({ isFinal: true }); + expect(capture.uiMessages).toEqual([ + { id: "asst-msg-4", role: "assistant", partTypes: ["text"] }, + ]); + expect(runAfter).not.toHaveBeenCalled(); + } finally { + await harness.close(); + } + }); + + it("turn.handover.isFinal: false → streamText runs the handed-over tool round", async () => { + const capture: Capture = {}; + const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 })); + + const agent = chat.customAgent({ + id: "session.handover.toolcall", + run: async (payload) => { + const session = chat.createSession(payload, { signal: new AbortController().signal }); + for await (const turn of session) { + capture.handover = turn.handover; + const result = streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }), + }), + messages: turn.messages, + tools: { weather: weatherToolWithExecute(toolExecute) }, + abortSignal: turn.signal, + }); + await turn.complete(result); + return; + } + }, + }); + + const harness = mockChatAgent(agent, { chatId: "s-tool", mode: "handover-prepare" }); + try { + await harness.sendHandover({ + partialAssistantMessage: TOOL_CALL_PARTIAL, + messageId: "asst-msg-5", + isFinal: false, + }); + // Surfaced as a non-final handover turn, and the handed-over tool round + // resumed (agent-side execute ran). Full step-2-text path covered by T29. + expect(capture.handover).toEqual({ isFinal: false }); + expect(toolExecute).toHaveBeenCalledWith( + expect.objectContaining({ city: "tokyo" }), + expect.anything() + ); + } finally { + await harness.close(); + } + }); +}); From 32f05360ed8ce03e5baaa060ba18d32752912fa1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 16 Jun 2026 13:33:34 +0100 Subject: [PATCH 2/2] fix(sdk): guard no-source turn.complete() to final handover turns --- packages/trigger-sdk/src/v3/ai.ts | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 9afb63e2180..685bb8909ad 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -9476,11 +9476,24 @@ function createChatSession( async complete(source?: UIMessageStreamable) { // Head-start final turn: the warm step-1 partial is already spliced - // into the accumulator and IS the response — nothing to pipe. + // into the accumulator and IS the response — nothing to pipe. Only + // valid on a final handover; a missing source on any other turn is a + // mistake (it would silently finalize without an assistant response). if (!source) { + if (!handoverThisTurn?.isFinal) { + throw new Error( + "turn.complete() requires a stream source unless turn.handover.isFinal is true" + ); + } + const response = accumulator.uiMessages.at(-1); + if (!response || response.role !== "assistant") { + throw new Error( + "turn.complete() could not find the spliced handover response" + ); + } sessionMsgSub.off(); await chatWriteTurnComplete(); - return accumulator.uiMessages.at(-1); + return response; } let response: UIMessage | undefined; try {