feat(sse): invoke finalize on response stream end#2741
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR refactors SSE cleanup behavior and stream management: RequestHandler exposes a reusable ChangesSSE Cleanup and Finalization Behavior
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
test/browser/sse-api/sse.finally.test.ts (1)
15-127: ⚡ Quick winAdd a multi-connection regression test.
The three scenarios cover happy paths for a single connection but won't catch the per-handler emitter cross-fire concern flagged in
src/core/sse.ts(one connection's close running another connection'sfinalize). A test that opens twoEventSourceinstances against the same handler, closes the first, and asserts that the second connection'sfinalizehas NOT yet been invoked would protect against that regression once it's fixed.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@test/browser/sse-api/sse.finally.test.ts` around lines 15 - 127, Add a new regression test that opens two EventSource connections to the same sse handler and verifies closing the first does not trigger the second handler's finalize; specifically, in the test use setupWorker and sse with a finalize(() => window.notifyFinalizedSecond()) for the second connection (and notifyFinalizedFirst for the first), create two EventSource instances in the page.evaluate, close the first (or call client.close() in the handler for the first), and assert that the Promise for the second connection's finalize has not resolved after the first closes (then finally close the second and assert its finalize resolves); reference the existing symbols setupWorker, sse, finalize, client.close and the exposed functions notifyFinalized* to locate where to add this new test.src/core/sse.ts (1)
311-337: ⚡ Quick winMake
error()consistent withclose()(state ordering + try/catch).
close()swallows controller exceptions and uses#closed.resolve(), buterror()does not. Ifthis.#controller.error()throws (e.g., the stream was already cancelled by the reader, putting the controller in a non-pending state on the controller side while#closedis still pending here),#closednever resolves and the'error'event never gets emitted — both client and any awaiters of#closedbecome stuck. Mirroring the structure used inclose()keeps the two methods symmetric and idempotent.♻️ Proposed fix
public error(): void { if (this.#closed.state !== 'pending') { return } - this.#controller.error() - this.#closed.resolve() - this[kClientEmitter]?.emit(new TypedEvent('error')) + this.#closed.resolve() + try { + this.#controller.error() + } catch { + // + } + this[kClientEmitter]?.emit(new TypedEvent('error')) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/core/sse.ts` around lines 311 - 337, The error() method must be made symmetric to close(): first check this.#closed.state !== 'pending' and return, then call this.#controller.error() inside a try/catch (swallowing controller exceptions), always call this.#closed.resolve() after the controller call (even if controller.error() threw), and finally emit the 'error' event via this[kClientEmitter]?.emit(new TypedEvent('error')); modify the error() implementation in the class containing methods error() and close() to mirror close()’s ordering and try/catch to ensure idempotence and that `#closed` is always resolved.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/core/sse.ts`:
- Around line 200-211: The current exhaustCleanups registers listeners on the
shared `#emitter` which causes cross-connection firing and misses synchronous
closes; change it to use a per-connection close signal instead of `#emitter`:
create a per-connection promise/emitter (or use the existing `#closed` deferred
tied to this client instance) and, inside exhaustCleanups, if that
per-connection closed is already resolved call super.exhaustCleanups(cleanups)
immediately, otherwise attach a single per-connection listener (or await the
per-connection close promise) to invoke super.exhaustCleanups(cleanups) when
that specific connection closes; remove usage of the shared `#emitter` for
per-connection cleanup wiring to avoid cross-fire and race conditions when
resolvers call client.close() synchronously.
In `@src/mockServiceWorker.js`:
- Around line 137-141: The content-type check in the conditional that guards
sendToClient is too strict and fails to detect SSE headers with parameters
(e.g., "text/event-stream; charset=utf-8"), causing responseClone.body to be
transferred and corrupt SSE streams; update the check used where client,
activeClientIds.has(client.id) and response.headers.get('content-type') are
evaluated so it treats any content type that starts with or has the MIME prefix
"text/event-stream" as SSE (e.g., use a startsWith or parse-and-compare on
response.headers.get('content-type')), and ensure sendToClient is not passed
responseClone.body for those SSE responses to avoid detaching the stream
(references: client, activeClientIds, response.headers.get, responseClone.body,
sendToClient, core/sse.ts).
---
Nitpick comments:
In `@src/core/sse.ts`:
- Around line 311-337: The error() method must be made symmetric to close():
first check this.#closed.state !== 'pending' and return, then call
this.#controller.error() inside a try/catch (swallowing controller exceptions),
always call this.#closed.resolve() after the controller call (even if
controller.error() threw), and finally emit the 'error' event via
this[kClientEmitter]?.emit(new TypedEvent('error')); modify the error()
implementation in the class containing methods error() and close() to mirror
close()’s ordering and try/catch to ensure idempotence and that `#closed` is
always resolved.
In `@test/browser/sse-api/sse.finally.test.ts`:
- Around line 15-127: Add a new regression test that opens two EventSource
connections to the same sse handler and verifies closing the first does not
trigger the second handler's finalize; specifically, in the test use setupWorker
and sse with a finalize(() => window.notifyFinalizedSecond()) for the second
connection (and notifyFinalizedFirst for the first), create two EventSource
instances in the page.evaluate, close the first (or call client.close() in the
handler for the first), and assert that the Promise for the second connection's
finalize has not resolved after the first closes (then finally close the second
and assert its finalize resolves); reference the existing symbols setupWorker,
sse, finalize, client.close and the exposed functions notifyFinalized* to locate
where to add this new test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c04fab05-4a10-4231-942a-2f89db1e3242
📒 Files selected for processing (4)
src/core/handlers/RequestHandler.tssrc/core/sse.tssrc/mockServiceWorker.jstest/browser/sse-api/sse.finally.test.ts
|
|
||
| protected async exhaustCleanups( | ||
| cleanups: Array<() => MaybePromise<void>>, | ||
| ): Promise<void> { | ||
| const onClose = () => { | ||
| this.#emitter.removeListener('error', onClose) | ||
| this.#emitter.removeListener('close', onClose) | ||
| void super.exhaustCleanups(cleanups) | ||
| } | ||
|
|
||
| this.#emitter.once('error', onClose).once('close', onClose) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm the emitter is per-handler (single instance for all connections of a given sse route)
# and that there is no per-connection scoping when emitting 'close'/'error'.
rg -nP -C3 '#emitter\s*[:=]|kClientEmitter|new\s+Emitter<' --type=ts
rg -nP -C2 'emit\(\s*new\s+TypedEvent\(' --type=tsRepository: mswjs/msw
Length of output: 3767
🏁 Script executed:
# Find where exhaustCleanups is called and the surrounding context
rg -nP -B5 -A5 'exhaustCleanups' src/core/sse.ts | head -100
# Check the resolver handler and finalize registration
rg -nP -B10 -A5 'runScheduledCleanups|finalize|resolver\(' src/core/sse.ts | head -150Repository: mswjs/msw
Length of output: 895
🏁 Script executed:
# Find the parent class and runScheduledCleanups
rg -nP 'class HttpHandler|runScheduledCleanups|finalize' src/core/ --type=ts | head -50
# Check the full handler flow
rg -nP -B15 'await resolver' src/core/sse.ts | head -80Repository: mswjs/msw
Length of output: 1799
🏁 Script executed:
# Get the full runScheduledCleanups implementation
sed -n '535,570p' src/core/handlers/RequestHandler.tsRepository: mswjs/msw
Length of output: 971
🏁 Script executed:
# Check if there are any tests that demonstrate synchronous close or multiple connections
fd -e test.ts -e spec.ts src/ | xargs rg -l 'sse\|ServerSentEvent' | head -5
# Get the createEventStream function to see if there's any per-connection state
rg -nP -B5 -A20 'function createEventStream|const createEventStream' src/core/sse.tsRepository: mswjs/msw
Length of output: 1183
🏁 Script executed:
# Look for test files related to sse
find src -type f -name '*.test.ts' -o -name '*.spec.ts' | xargs grep -l 'sse\|ServerSentEvent' 2>/dev/null | head -3Repository: mswjs/msw
Length of output: 174
🏁 Script executed:
# Search for SSE-specific tests
find src -type f \( -name '*.test.ts' -o -name '*.spec.ts' \) | xargs grep -l 'ServerSentEventHandler\|http\.sse' 2>/dev/null
# Also check for any actual SSE handler usage examples
rg -l 'http\.sse\|sse\(' --type=ts src/ | grep -v node_modulesRepository: mswjs/msw
Length of output: 35
Cleanup wiring on the shared emitter cross-fires across connections and has a startup race.
Two correctness issues stem from registering cleanups on the per-handler #emitter:
-
Multi-connection cross-fire.
#emitteris shared by every connection that thissse(...)handler accepts. When connection A closes, the emitter emitscloseonce and fires everyonce('close', onClose)listener registered so far — including theonClosecaptured for connection B's cleanups. B'sclearInterval(or other per-connection cleanup from the issue#2630example) will run while B is still streaming, dropping its background work. -
Resolver-synchronous close race. If a resolver synchronously calls
client.close()after registeringfinalize(callback), the event fires beforeexhaustCleanupsattaches listeners. The cleanup will never run because the'close'event is emitted synchronously (line 336) while the resolver is still executing, beforerunScheduledCleanups→exhaustCleanupscan attach the listener (line 210).
The cleanup binding must be per-connection. One option is to track close state on the client (the #closed deferred already exists) and pass a per-connection emitter or close-promise into the resolver context, running super.exhaustCleanups(cleanups) when that promise resolves — handling both "already closed" (run immediately) and "closes later" (await) cases.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/core/sse.ts` around lines 200 - 211, The current exhaustCleanups
registers listeners on the shared `#emitter` which causes cross-connection firing
and misses synchronous closes; change it to use a per-connection close signal
instead of `#emitter`: create a per-connection promise/emitter (or use the
existing `#closed` deferred tied to this client instance) and, inside
exhaustCleanups, if that per-connection closed is already resolved call
super.exhaustCleanups(cleanups) immediately, otherwise attach a single
per-connection listener (or await the per-connection close promise) to invoke
super.exhaustCleanups(cleanups) when that specific connection closes; remove
usage of the shared `#emitter` for per-connection cleanup wiring to avoid
cross-fire and race conditions when resolvers call client.close() synchronously.
|
Tested and works as expected: The
|
finalize on response stream endfinalize on response stream end
Warning
This includes the worker script changes, making it a breaking change.
finalizeAPI for handler cleanup #2738onCleanupcallback to schedule side effect cleanup when resolvers are finished #2630Todos
RequestHandlerlevel, if possible, to supportfinalize()for user-providedReadableStreamin mocked responses. Otherwise, it will be called immediately when the response is returned and not when the stream is closed.text/event-streamresponses will not emitresponse:*events. Doing so requires cloning the stream, and cloning the stream breaks cancelation forwarding for all consumers.