Skip to content

refactor(sse): consolidate client SSE readers behind a single typed primitive#5195

Merged
waleedlatif1 merged 1 commit into
stagingfrom
refactor/sse-reader-consolidation
Jun 24, 2026
Merged

refactor(sse): consolidate client SSE readers behind a single typed primitive#5195
waleedlatif1 merged 1 commit into
stagingfrom
refactor/sse-reader-consolidation

Conversation

@waleedlatif1

Copy link
Copy Markdown
Collaborator

Summary

  • Replace 5+ copy-pasted client SSE reader loops (which disagreed on \n vs \n\n framing, \r handling, abort, and typing) with one decode engine in lib/core/utils/sse.ts: readSSELines (raw byte→line engine) + readSSEEvents<T> (typed JSON layer).
  • Migrate use-execution-stream, deployed-chat, home-chat, and workflow-editor-chat readers to delegate — each keeps its own typed events, RAF batching, TTS, reconnect, and abort behavior. Behavior is identical.
  • Fixes a latent bug where the old per-chunk split('\n\n') could drop events straddling chunk boundaries.
  • Left deliberate specializations untouched: the generated-contract copilot parser and the browser-native EventSource tables hook. connectors/utils.ts is a binary byte-cap reader (not SSE) and is unchanged.

Type of Change

  • Refactor / improvement

Testing

  • 52 unit tests in lib/core/utils/sse.test.ts pin prior behavior: \n/\n\n framing, mid-chunk splits, [DONE], data: with/without leading space, \r\n stripping, early-stop, pre-aborted + mid-stream abort, reader-lock release/non-release, Response vs stream source.
  • tsc 0 errors, Biome clean.

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel

vercel Bot commented Jun 24, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Jun 24, 2026 4:47pm

Request Review

@cursor

cursor Bot commented Jun 24, 2026

Copy link
Copy Markdown

PR Summary

Medium Risk
Refactors every major client streaming path (chat + workflow execution); behavior is intended to be identical but regressions in abort, terminal events, or partial chunks would affect live UX.

Overview
Introduces readSSELines and readSSEEvents<T> in lib/core/utils/sse.ts as the shared client decode path for SSE: incremental \n framing (works with both \n and \n\n servers), [DONE] skipping, optional AbortSignal, early exit when handlers return true, and correct reader lock release when the helper acquires the stream.

Replaces duplicated getReader + TextDecoder loops in deployed chat (use-chat-streaming), mothership home chat (use-chat via raw lines + schema validation), workflow editor chat, and execution streaming (processSSEStream). Each caller keeps its own event handling (RAF batching, TTS, reconnect, dispatch); only framing/parsing moves central.

Fixes a latent split-across-chunks bug from per-chunk split('\n\n') that could drop events at boundaries. Adds broad unit coverage in sse.test.ts and reader-lock tests in use-execution-stream.test.ts.

Reviewed by Cursor Bugbot for commit 228c698. Configure here.

Comment thread apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Comment thread apps/sim/hooks/use-execution-stream.ts
@waleedlatif1

Copy link
Copy Markdown
Collaborator Author

@greptile review

@waleedlatif1

Copy link
Copy Markdown
Collaborator Author

@cursor review

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

2 issues from previous reviews remain unresolved.

Fix All in Cursor

Comment @cursor review or bugbot run to trigger another review on this PR

Reviewed by Cursor Bugbot for commit 225bf82. Configure here.

@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR replaces 5+ copy-pasted client SSE reader loops — each with slightly different \n/\n\n framing, \r handling, and abort semantics — with a single decode engine (readSSELines + readSSEEvents) in lib/core/utils/sse.ts. All four consumers are migrated to delegate to the primitive while keeping their own UI batching, TTS, and abort behavior intact.

  • Core primitive (sse.ts): readSSELines decodes bytes → \n-split lines → data: payloads, fixing a latent boundary bug where the old per-chunk split('\n\n') could silently drop events that straddled chunk boundaries. readSSEEvents adds JSON parsing on top. Both functions own the reader lifecycle only when they acquire the lock; caller-owned readers are respected.
  • Consumer migrations: use-execution-stream.ts now releases the reader lock in a finally (previously missing); chat.tsx correctly returns true from the final event handler on both success and error paths; the post-loop flushChunks()/finalizeMessageStream() call is the single terminal path.
  • Test coverage: 52 unit tests pin \n/\n\n framing, mid-chunk splits, [DONE], data: with/without leading space, \r\n stripping, early-stop, pre-aborted + mid-stream abort, reader-lock release, and all three source types.

Confidence Score: 5/5

This is a well-executed refactor with no correctness gaps; safe to merge.

The previously-identified issues — missing return true in the final event success branch in chat.tsx, and the reader lock never being released in processSSEStream — were both addressed in prior commits. The new readSSELines core correctly polls signal?.aborted before each chunk and between events, releases locks only when it owns them, and propagates both sync and async early-stop signals. The 52 unit tests pin all the edge-case behaviors.

No files require special attention. The most complex file is use-chat.ts, but the migration is straightforward and the behavioral change — stopping immediately after sawCompleteEvent rather than draining the chunk — is an improvement.

Important Files Changed

Filename Overview
apps/sim/lib/core/utils/sse.ts New readSSELines + readSSEEvents primitive; correctly handles lock ownership, abort signal polling, \r\n, [DONE], and early-stop via return true. Well-documented and well-tested.
apps/sim/lib/core/utils/sse.test.ts 52 unit tests covering all framing variants, mid-chunk splits, abort paths, lock lifecycle for all three source types, and parse-error routing. Coverage is thorough.
apps/sim/hooks/use-execution-stream.ts Migrated to readSSEEvents; processSSEStream now wraps the read in try/finally { reader.releaseLock() }, restoring the always-release guarantee on every exit path.
apps/sim/hooks/use-execution-stream.test.ts Added two tests asserting reader lock is released after normal completion and when a handler throws.
apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx Migrated to readSSEEvents; final event returns true on both success and error paths; single post-loop flushChunks()/finalizeMessageStream() is the only terminal path. AbortError filtering in the catch block is preserved.
apps/sim/app/chat/hooks/use-chat-streaming.ts Migrated to readSSEEvents; response.body (stream) is passed directly so the primitive owns the reader lifecycle. Abort handled via the existing AbortController signal. The terminated flag correctly gates post-stream TTS/UI flush.
apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts Migrated to readSSELines; caller-owned reader correctly passes ownsLock: false so the primitive skips releaseLock(). The sawCompleteEvent early-stop is now checked per-event (immediately after dispatch), which is more responsive than the old per-chunk check.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[SSESource
Response / ReadableStream / Reader] --> B[toReader
ownsLock?]
    B -->|ReadableStream / Response ownsLock = true| C[getReader]
    B -->|ReadableStreamDefaultReader ownsLock = false| D[use as-is]
    C --> E[readSSELines]
    D --> E
    E --> F{signal?.aborted?}
    F -->|yes| Z[return early]
    F -->|no| G[reader.read]
    G -->|done = true| H[break loop]
    G -->|chunk| I[TextDecoder.decode split on newline]
    I --> J{line starts with data:?}
    J -->|no| K[skip blank / non-data lines]
    K --> F
    J -->|yes| L[strip CR, strip optional space, check DONE sentinel]
    L -->|DONE| K
    L -->|payload| M{signal?.aborted between events?}
    M -->|yes| Z
    M -->|no| N[onData callback]
    N -->|returns true| Z
    N -->|other| F
    H --> O{ownsLock?}
    Z --> O
    O -->|yes| P[reader.releaseLock]
    O -->|no| Q[caller owns lifecycle]
    E --> R[readSSEEvents JSON.parse layer]
    R -->|parse ok| S[onEvent callback]
    R -->|parse error| T[onParseError? or skip]
    S -->|return true| U[early stop]
    S -->|other| E
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[SSESource
Response / ReadableStream / Reader] --> B[toReader
ownsLock?]
    B -->|ReadableStream / Response ownsLock = true| C[getReader]
    B -->|ReadableStreamDefaultReader ownsLock = false| D[use as-is]
    C --> E[readSSELines]
    D --> E
    E --> F{signal?.aborted?}
    F -->|yes| Z[return early]
    F -->|no| G[reader.read]
    G -->|done = true| H[break loop]
    G -->|chunk| I[TextDecoder.decode split on newline]
    I --> J{line starts with data:?}
    J -->|no| K[skip blank / non-data lines]
    K --> F
    J -->|yes| L[strip CR, strip optional space, check DONE sentinel]
    L -->|DONE| K
    L -->|payload| M{signal?.aborted between events?}
    M -->|yes| Z
    M -->|no| N[onData callback]
    N -->|returns true| Z
    N -->|other| F
    H --> O{ownsLock?}
    Z --> O
    O -->|yes| P[reader.releaseLock]
    O -->|no| Q[caller owns lifecycle]
    E --> R[readSSEEvents JSON.parse layer]
    R -->|parse ok| S[onEvent callback]
    R -->|parse error| T[onParseError? or skip]
    S -->|return true| U[early stop]
    S -->|other| E
Loading

Reviews (3): Last reviewed commit: "refactor(sse): consolidate client SSE re..." | Re-trigger Greptile

Comment thread apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx Outdated
Comment thread apps/sim/hooks/use-execution-stream.ts Outdated
@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This refactor consolidates five divergent client-side SSE read loops into a single decode engine (readSSELines / readSSEEvents) in lib/core/utils/sse.ts, then migrates use-execution-stream, use-chat-streaming, home/use-chat, and workflow-editor chat to delegate to it. The core primitive is well-designed — correct lock-ownership tracking, abort-signal propagation, -stripping, and 52 unit tests covering all framing variants.

  • readSSELines splits on \ , treating each data: line independently; this correctly handles both \ - and \ \ -framed servers and fixes the latent cross-chunk event split bug.
  • processSSEStream in use-execution-stream.ts lost its finally { reader.releaseLock() } guard: because a ReadableStreamDefaultReader is passed rather than a stream, readSSEEvents sets ownsLock: false and never releases the lock — leaking it on every execution stream.
  • chat.tsx now calls finalizeMessageStream twice on the error path (once inside onEvent, once at the post-readSSEEvents call site) causing a redundant Zustand state update and re-render.

Confidence Score: 4/5

Safe to merge with a fix for the missing lock release in processSSEStream; the chat double-finalize is harmless in practice but worth cleaning up.

The core SSE primitive and its test suite are solid. The lock-release omission in processSSEStream is a concrete regression: every workflow execution now leaves a permanent lock on response.body, blocking cancellation and preventing the body stream from being GC'd promptly. The chat double-finalize is idempotent but adds a spurious Zustand update on every errored stream. Everything else — framing, abort, stale-check, TTS, RAF batching — migrates correctly.

apps/sim/hooks/use-execution-stream.ts needs a finally { reader.releaseLock() } wrapper around the readSSEEvents call to restore the lock-release guarantee that was present before the refactor.

Important Files Changed

Filename Overview
apps/sim/lib/core/utils/sse.ts New readSSELines + readSSEEvents decode engine; correct lock-ownership tracking, abort signal propagation, and \r stripping.
apps/sim/lib/core/utils/sse.test.ts 52 new tests covering framing variants, mid-chunk splits, abort, lock-ownership, and early-stop; comprehensive and accurate.
apps/sim/hooks/use-execution-stream.ts Refactored to delegate to readSSEEvents(reader, …); the original finally { reader.releaseLock() } was silently dropped and is not compensated.
apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx Delegates to readSSEEvents; error-path now calls finalizeMessageStream twice (inside handler and again after readSSEEvents returns) whereas the original called it once via a hard return.
apps/sim/app/chat/hooks/use-chat-streaming.ts Migrates deployed-chat SSE loop to readSSEEvents; abort and terminated flag semantics are equivalent to the original.
apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts Migrates home-chat SSE loop to readSSELines; staleness and complete-event early-stop semantics preserved correctly.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller
    participant readSSEEvents
    participant readSSELines
    participant toReader
    participant Reader

    Caller->>readSSEEvents: source (Response/Stream/Reader), options
    readSSEEvents->>readSSELines: "source, { signal, onData }"
    readSSELines->>toReader: source
    toReader-->>readSSELines: "{ reader, ownsLock }"
    Note over toReader: ownsLock=true for Response/Stream<br/>ownsLock=false for Reader

    loop while not done / not aborted
        readSSELines->>Reader: read()
        Reader-->>readSSELines: "{ done, value }"
        readSSELines->>readSSELines: decode + split on newline
        loop per data: line
            readSSELines->>readSSEEvents: onData(raw)
            readSSEEvents->>readSSEEvents: JSON.parse(raw)
            readSSEEvents->>Caller: onEvent(parsed)
            Caller-->>readSSEEvents: "SSEStopSignal (true = stop)"
        end
    end

    Note over readSSELines: finally: if ownsLock → releaseLock()
    readSSELines-->>readSSEEvents: done
    readSSEEvents-->>Caller: done
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller
    participant readSSEEvents
    participant readSSELines
    participant toReader
    participant Reader

    Caller->>readSSEEvents: source (Response/Stream/Reader), options
    readSSEEvents->>readSSELines: "source, { signal, onData }"
    readSSELines->>toReader: source
    toReader-->>readSSELines: "{ reader, ownsLock }"
    Note over toReader: ownsLock=true for Response/Stream<br/>ownsLock=false for Reader

    loop while not done / not aborted
        readSSELines->>Reader: read()
        Reader-->>readSSELines: "{ done, value }"
        readSSELines->>readSSELines: decode + split on newline
        loop per data: line
            readSSELines->>readSSEEvents: onData(raw)
            readSSEEvents->>readSSEEvents: JSON.parse(raw)
            readSSEEvents->>Caller: onEvent(parsed)
            Caller-->>readSSEEvents: "SSEStopSignal (true = stop)"
        end
    end

    Note over readSSELines: finally: if ownsLock → releaseLock()
    readSSELines-->>readSSEEvents: done
    readSSEEvents-->>Caller: done
Loading

Reviews (2): Last reviewed commit: "refactor(sse): consolidate client SSE re..." | Re-trigger Greptile

Comment thread apps/sim/hooks/use-execution-stream.ts Outdated
Comment thread apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx Outdated
@waleedlatif1 waleedlatif1 force-pushed the refactor/sse-reader-consolidation branch from 225bf82 to adbb1cb Compare June 24, 2026 16:31
@waleedlatif1 waleedlatif1 force-pushed the refactor/sse-reader-consolidation branch from adbb1cb to a8e2682 Compare June 24, 2026 16:34
…rimitive

Replace four hand-rolled client SSE decode loops with two layered
primitives in lib/core/utils/sse.ts:

- readSSELines: the single byte-stream decode engine. Splits on \n,
  strips trailing \r, tolerates data: with/without a leading space,
  skips the [DONE] sentinel, honors an AbortSignal before each chunk and
  between events, and releases the reader lock only when it acquired it.
- readSSEEvents<T>: a thin JSON layer that parses each payload and routes
  unparseable lines to onParseError (default: skip).

An SSESource union accepts a Response, a ReadableStream, or an
already-acquired reader so callers that must stash the reader for
external cancellation keep ownership of the lock.

Migrates use-execution-stream, chat use-chat-streaming, home use-chat
(via readSSELines for schema-validated decode), and the workflow chat
panel. Legacy server/wand exports (encodeSSE, SSE_HEADERS,
readSSEStream) are untouched. Behavior is preserved across abort, RAF
batching, TTS, [DONE], delimiter tolerance, and reader-lock ownership.

Tests in sse.test.ts pin the prior behavior: \n and \n\n framing,
mid-chunk splits, [DONE], data: with/without leading space, \r\n
stripping, sync/async early-stop, pre-aborted and mid-stream abort,
lock release/non-release per source, lock release on a throwing
handler, and Response/stream/reader sources.
@waleedlatif1

Copy link
Copy Markdown
Collaborator Author

@greptile review

@waleedlatif1 waleedlatif1 force-pushed the refactor/sse-reader-consolidation branch from a8e2682 to 228c698 Compare June 24, 2026 16:47
@waleedlatif1

Copy link
Copy Markdown
Collaborator Author

@cursor review

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

Reviewed by Cursor Bugbot for commit 228c698. Configure here.

@waleedlatif1 waleedlatif1 merged commit 038e8f0 into staging Jun 24, 2026
16 checks passed
@waleedlatif1 waleedlatif1 deleted the refactor/sse-reader-consolidation branch June 24, 2026 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant