refactor(sse): consolidate client SSE readers behind a single typed primitive#5195
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryMedium Risk Overview Replaces duplicated Fixes a latent split-across-chunks bug from per-chunk Reviewed by Cursor Bugbot for commit 228c698. Configure here. |
|
@greptile review |
|
@cursor review |
There was a problem hiding this comment.
✅ Bugbot reviewed your changes and found no new issues!
2 issues from previous reviews remain unresolved.
Comment @cursor review or bugbot run to trigger another review on this PR
Reviewed by Cursor Bugbot for commit 225bf82. Configure here.
Greptile SummaryThis PR replaces 5+ copy-pasted client SSE reader loops — each with slightly different
Confidence Score: 5/5This is a well-executed refactor with no correctness gaps; safe to merge. The previously-identified issues — missing return true in the final event success branch in chat.tsx, and the reader lock never being released in processSSEStream — were both addressed in prior commits. The new readSSELines core correctly polls signal?.aborted before each chunk and between events, releases locks only when it owns them, and propagates both sync and async early-stop signals. The 52 unit tests pin all the edge-case behaviors. No files require special attention. The most complex file is use-chat.ts, but the migration is straightforward and the behavioral change — stopping immediately after sawCompleteEvent rather than draining the chunk — is an improvement. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[SSESource
Response / ReadableStream / Reader] --> B[toReader
ownsLock?]
B -->|ReadableStream / Response ownsLock = true| C[getReader]
B -->|ReadableStreamDefaultReader ownsLock = false| D[use as-is]
C --> E[readSSELines]
D --> E
E --> F{signal?.aborted?}
F -->|yes| Z[return early]
F -->|no| G[reader.read]
G -->|done = true| H[break loop]
G -->|chunk| I[TextDecoder.decode split on newline]
I --> J{line starts with data:?}
J -->|no| K[skip blank / non-data lines]
K --> F
J -->|yes| L[strip CR, strip optional space, check DONE sentinel]
L -->|DONE| K
L -->|payload| M{signal?.aborted between events?}
M -->|yes| Z
M -->|no| N[onData callback]
N -->|returns true| Z
N -->|other| F
H --> O{ownsLock?}
Z --> O
O -->|yes| P[reader.releaseLock]
O -->|no| Q[caller owns lifecycle]
E --> R[readSSEEvents JSON.parse layer]
R -->|parse ok| S[onEvent callback]
R -->|parse error| T[onParseError? or skip]
S -->|return true| U[early stop]
S -->|other| E
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A[SSESource
Response / ReadableStream / Reader] --> B[toReader
ownsLock?]
B -->|ReadableStream / Response ownsLock = true| C[getReader]
B -->|ReadableStreamDefaultReader ownsLock = false| D[use as-is]
C --> E[readSSELines]
D --> E
E --> F{signal?.aborted?}
F -->|yes| Z[return early]
F -->|no| G[reader.read]
G -->|done = true| H[break loop]
G -->|chunk| I[TextDecoder.decode split on newline]
I --> J{line starts with data:?}
J -->|no| K[skip blank / non-data lines]
K --> F
J -->|yes| L[strip CR, strip optional space, check DONE sentinel]
L -->|DONE| K
L -->|payload| M{signal?.aborted between events?}
M -->|yes| Z
M -->|no| N[onData callback]
N -->|returns true| Z
N -->|other| F
H --> O{ownsLock?}
Z --> O
O -->|yes| P[reader.releaseLock]
O -->|no| Q[caller owns lifecycle]
E --> R[readSSEEvents JSON.parse layer]
R -->|parse ok| S[onEvent callback]
R -->|parse error| T[onParseError? or skip]
S -->|return true| U[early stop]
S -->|other| E
Reviews (3): Last reviewed commit: "refactor(sse): consolidate client SSE re..." | Re-trigger Greptile |
Greptile SummaryThis refactor consolidates five divergent client-side SSE read loops into a single decode engine (
Confidence Score: 4/5Safe to merge with a fix for the missing lock release in The core SSE primitive and its test suite are solid. The lock-release omission in apps/sim/hooks/use-execution-stream.ts needs a Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Caller
participant readSSEEvents
participant readSSELines
participant toReader
participant Reader
Caller->>readSSEEvents: source (Response/Stream/Reader), options
readSSEEvents->>readSSELines: "source, { signal, onData }"
readSSELines->>toReader: source
toReader-->>readSSELines: "{ reader, ownsLock }"
Note over toReader: ownsLock=true for Response/Stream<br/>ownsLock=false for Reader
loop while not done / not aborted
readSSELines->>Reader: read()
Reader-->>readSSELines: "{ done, value }"
readSSELines->>readSSELines: decode + split on newline
loop per data: line
readSSELines->>readSSEEvents: onData(raw)
readSSEEvents->>readSSEEvents: JSON.parse(raw)
readSSEEvents->>Caller: onEvent(parsed)
Caller-->>readSSEEvents: "SSEStopSignal (true = stop)"
end
end
Note over readSSELines: finally: if ownsLock → releaseLock()
readSSELines-->>readSSEEvents: done
readSSEEvents-->>Caller: done
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Caller
participant readSSEEvents
participant readSSELines
participant toReader
participant Reader
Caller->>readSSEEvents: source (Response/Stream/Reader), options
readSSEEvents->>readSSELines: "source, { signal, onData }"
readSSELines->>toReader: source
toReader-->>readSSELines: "{ reader, ownsLock }"
Note over toReader: ownsLock=true for Response/Stream<br/>ownsLock=false for Reader
loop while not done / not aborted
readSSELines->>Reader: read()
Reader-->>readSSELines: "{ done, value }"
readSSELines->>readSSELines: decode + split on newline
loop per data: line
readSSELines->>readSSEEvents: onData(raw)
readSSEEvents->>readSSEEvents: JSON.parse(raw)
readSSEEvents->>Caller: onEvent(parsed)
Caller-->>readSSEEvents: "SSEStopSignal (true = stop)"
end
end
Note over readSSELines: finally: if ownsLock → releaseLock()
readSSELines-->>readSSEEvents: done
readSSEEvents-->>Caller: done
Reviews (2): Last reviewed commit: "refactor(sse): consolidate client SSE re..." | Re-trigger Greptile |
225bf82 to
adbb1cb
Compare
adbb1cb to
a8e2682
Compare
…rimitive Replace four hand-rolled client SSE decode loops with two layered primitives in lib/core/utils/sse.ts: - readSSELines: the single byte-stream decode engine. Splits on \n, strips trailing \r, tolerates data: with/without a leading space, skips the [DONE] sentinel, honors an AbortSignal before each chunk and between events, and releases the reader lock only when it acquired it. - readSSEEvents<T>: a thin JSON layer that parses each payload and routes unparseable lines to onParseError (default: skip). An SSESource union accepts a Response, a ReadableStream, or an already-acquired reader so callers that must stash the reader for external cancellation keep ownership of the lock. Migrates use-execution-stream, chat use-chat-streaming, home use-chat (via readSSELines for schema-validated decode), and the workflow chat panel. Legacy server/wand exports (encodeSSE, SSE_HEADERS, readSSEStream) are untouched. Behavior is preserved across abort, RAF batching, TTS, [DONE], delimiter tolerance, and reader-lock ownership. Tests in sse.test.ts pin the prior behavior: \n and \n\n framing, mid-chunk splits, [DONE], data: with/without leading space, \r\n stripping, sync/async early-stop, pre-aborted and mid-stream abort, lock release/non-release per source, lock release on a throwing handler, and Response/stream/reader sources.
|
@greptile review |
a8e2682 to
228c698
Compare
|
@cursor review |
There was a problem hiding this comment.
✅ Bugbot reviewed your changes and found no new issues!
Comment @cursor review or bugbot run to trigger another review on this PR
Reviewed by Cursor Bugbot for commit 228c698. Configure here.

Summary
\nvs\n\nframing,\rhandling, abort, and typing) with one decode engine inlib/core/utils/sse.ts:readSSELines(raw byte→line engine) +readSSEEvents<T>(typed JSON layer).use-execution-stream, deployed-chat, home-chat, and workflow-editor-chat readers to delegate — each keeps its own typed events, RAF batching, TTS, reconnect, and abort behavior. Behavior is identical.split('\n\n')could drop events straddling chunk boundaries.EventSourcetables hook.connectors/utils.tsis a binary byte-cap reader (not SSE) and is unchanged.Type of Change
Testing
lib/core/utils/sse.test.tspin prior behavior:\n/\n\nframing, mid-chunk splits,[DONE],data:with/without leading space,\r\nstripping, early-stop, pre-aborted + mid-stream abort, reader-lock release/non-release, Response vs stream source.tsc0 errors, Biome clean.Checklist