Skip to content

feat(sse): invoke finalize on response stream end#2741

Draft
kettanaito wants to merge 4 commits into
mainfrom
fix/sse-finalize
Draft

feat(sse): invoke finalize on response stream end#2741
kettanaito wants to merge 4 commits into
mainfrom
fix/sse-finalize

Conversation

@kettanaito
Copy link
Copy Markdown
Member

@kettanaito kettanaito commented May 7, 2026

Warning

This includes the worker script changes, making it a breaking change.

Todos

  • Consider rewriting this to the RequestHandler level, if possible, to support finalize() for user-provided ReadableStream in mocked responses. Otherwise, it will be called immediately when the response is returned and not when the stream is closed.
  • Document that text/event-stream responses will not emit response:* events. Doing so requires cloning the stream, and cloning the stream breaks cancelation forwarding for all consumers.
  • Double-check emitter being shared across requests. Add a test that has multiple SSE connections at the same time with different closure logic. Ensure their cleanups fire independently.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 7, 2026

Review Change Stack

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: a8f057ee-74ff-44a0-9523-6520c47035ca

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR refactors SSE cleanup behavior and stream management: RequestHandler exposes a reusable protected exhaustCleanups() helper; SSE streaming migrates from TransformStream to ReadableStream with direct controller access; SSE handlers override cleanup to attach lifecycle listeners; mockServiceWorker filters out SSE content-type from client lifecycle messages; browser tests verify cleanup callback timing across connection-close, handler-close, and handler-error scenarios.

Changes

SSE Cleanup and Finalization Behavior

Layer / File(s) Summary
Cleanup Infrastructure
src/core/handlers/RequestHandler.ts
RequestHandler introduces a protected async exhaustCleanups(cleanups) method that runs cleanup callbacks sequentially, collects errors, and reports failures via devUtils.error() with AggregateError. runScheduledCleanups() delegates to this helper for both iterator-associated and scheduled cleanups.
SSE Event Type Updates
src/core/sse.ts
ServerSentEventClientEventMap is updated to type message, error, and close as TypedEvent values with proper payload typing instead of untyped listener declarations.
SSE Stream Architecture
src/core/sse.ts
ServerSentEventClient is refactored to store a ReadableStreamDefaultController<Uint8Array> and a deferred "closed" promise. The error(), close(), enqueue(), and sendMessage() methods now interact directly with the controller. createEventStream() creates a ReadableStream with a cancellation handler that closes the client.
SSE Handler Integration and Logging
src/core/sse.ts
ServerSentEventHandler implements a protected exhaustCleanups() override that registers one-time error/close listeners, removes them on terminal state, and delegates to super.exhaustCleanups(). The log() method cancels the response stream body. Client message logging uses typed event payloads with data.event.
Client Message Filtering
src/mockServiceWorker.js
Response notifications to active clients are now filtered to exclude content-type: text/event-stream, preventing SSE responses from triggering client lifecycle messages.
SSE Cleanup Behavior Tests
test/browser/sse-api/sse.finally.test.ts
New Playwright tests verify that the finalize callback runs after client close, handler close, and handler error events. Each test measures timing and asserts the callback resolves no earlier than the observable termination event.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • mswjs/msw#2738: Both PRs modify RequestHandler's cleanup/finalization logic—exposing/centralizing cleanup execution and enabling handler-level overrides—for consistent cleanup handling.
  • mswjs/msw#2718: Both PRs refactor the SSE implementation in src/core/sse.ts, changing stream architecture from TransformStream to ReadableStream and integrating cleanup/abort wiring.
  • mswjs/msw#2725: Both PRs modify RequestHandler's cleanup and lifecycle behavior—the main PR exposes exhaustCleanups() for subclass overrides while the related PR adds generator state reset/restore logic.

Suggested labels

BREAKING CHANGE

Poem

🐰 From writeable streams we hop away,
To readable controllers' sway,
Each cleanup waits with gentle care,
Till events close—no trace to spare,
Now promises finalize the way! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The code changes implement cleanup callback support for SSE handlers via a protected exhaustCleanups helper and override pattern, directly addressing the requirement for cleanup notification when connections terminate [#2630].
Out of Scope Changes check ✅ Passed All changes focus on implementing cleanup callback support for SSE (RequestHandler cleanup extraction, SSE client lifecycle updates, worker script filtering) and adding corresponding tests, all aligned with issue #2630.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title directly summarizes the main change: making SSE response streams invoke finalize when the stream ends. This matches the PR's core objective of implementing cleanup callback support for SSE connections.
Description check ✅ Passed The PR description is directly related to the changeset. It explains the purpose (fixing SSE finalize behavior), references the related issue (#2630), acknowledges breaking changes, and notes follow-up work items.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/sse-finalize

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 7, 2026

Open in StackBlitz

npm i https://pkg.pr.new/msw@2741

commit: 82e4879

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
test/browser/sse-api/sse.finally.test.ts (1)

15-127: ⚡ Quick win

Add a multi-connection regression test.

The three scenarios cover happy paths for a single connection but won't catch the per-handler emitter cross-fire concern flagged in src/core/sse.ts (one connection's close running another connection's finalize). A test that opens two EventSource instances against the same handler, closes the first, and asserts that the second connection's finalize has NOT yet been invoked would protect against that regression once it's fixed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/browser/sse-api/sse.finally.test.ts` around lines 15 - 127, Add a new
regression test that opens two EventSource connections to the same sse handler
and verifies closing the first does not trigger the second handler's finalize;
specifically, in the test use setupWorker and sse with a finalize(() =>
window.notifyFinalizedSecond()) for the second connection (and
notifyFinalizedFirst for the first), create two EventSource instances in the
page.evaluate, close the first (or call client.close() in the handler for the
first), and assert that the Promise for the second connection's finalize has not
resolved after the first closes (then finally close the second and assert its
finalize resolves); reference the existing symbols setupWorker, sse, finalize,
client.close and the exposed functions notifyFinalized* to locate where to add
this new test.
src/core/sse.ts (1)

311-337: ⚡ Quick win

Make error() consistent with close() (state ordering + try/catch).

close() swallows controller exceptions and uses #closed.resolve(), but error() does not. If this.#controller.error() throws (e.g., the stream was already cancelled by the reader, putting the controller in a non-pending state on the controller side while #closed is still pending here), #closed never resolves and the 'error' event never gets emitted — both client and any awaiters of #closed become stuck. Mirroring the structure used in close() keeps the two methods symmetric and idempotent.

♻️ Proposed fix
   public error(): void {
     if (this.#closed.state !== 'pending') {
       return
     }

-    this.#controller.error()
-    this.#closed.resolve()
-    this[kClientEmitter]?.emit(new TypedEvent('error'))
+    this.#closed.resolve()
+    try {
+      this.#controller.error()
+    } catch {
+      //
+    }
+    this[kClientEmitter]?.emit(new TypedEvent('error'))
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/core/sse.ts` around lines 311 - 337, The error() method must be made
symmetric to close(): first check this.#closed.state !== 'pending' and return,
then call this.#controller.error() inside a try/catch (swallowing controller
exceptions), always call this.#closed.resolve() after the controller call (even
if controller.error() threw), and finally emit the 'error' event via
this[kClientEmitter]?.emit(new TypedEvent('error')); modify the error()
implementation in the class containing methods error() and close() to mirror
close()’s ordering and try/catch to ensure idempotence and that `#closed` is
always resolved.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/core/sse.ts`:
- Around line 200-211: The current exhaustCleanups registers listeners on the
shared `#emitter` which causes cross-connection firing and misses synchronous
closes; change it to use a per-connection close signal instead of `#emitter`:
create a per-connection promise/emitter (or use the existing `#closed` deferred
tied to this client instance) and, inside exhaustCleanups, if that
per-connection closed is already resolved call super.exhaustCleanups(cleanups)
immediately, otherwise attach a single per-connection listener (or await the
per-connection close promise) to invoke super.exhaustCleanups(cleanups) when
that specific connection closes; remove usage of the shared `#emitter` for
per-connection cleanup wiring to avoid cross-fire and race conditions when
resolvers call client.close() synchronously.

In `@src/mockServiceWorker.js`:
- Around line 137-141: The content-type check in the conditional that guards
sendToClient is too strict and fails to detect SSE headers with parameters
(e.g., "text/event-stream; charset=utf-8"), causing responseClone.body to be
transferred and corrupt SSE streams; update the check used where client,
activeClientIds.has(client.id) and response.headers.get('content-type') are
evaluated so it treats any content type that starts with or has the MIME prefix
"text/event-stream" as SSE (e.g., use a startsWith or parse-and-compare on
response.headers.get('content-type')), and ensure sendToClient is not passed
responseClone.body for those SSE responses to avoid detaching the stream
(references: client, activeClientIds, response.headers.get, responseClone.body,
sendToClient, core/sse.ts).

---

Nitpick comments:
In `@src/core/sse.ts`:
- Around line 311-337: The error() method must be made symmetric to close():
first check this.#closed.state !== 'pending' and return, then call
this.#controller.error() inside a try/catch (swallowing controller exceptions),
always call this.#closed.resolve() after the controller call (even if
controller.error() threw), and finally emit the 'error' event via
this[kClientEmitter]?.emit(new TypedEvent('error')); modify the error()
implementation in the class containing methods error() and close() to mirror
close()’s ordering and try/catch to ensure idempotence and that `#closed` is
always resolved.

In `@test/browser/sse-api/sse.finally.test.ts`:
- Around line 15-127: Add a new regression test that opens two EventSource
connections to the same sse handler and verifies closing the first does not
trigger the second handler's finalize; specifically, in the test use setupWorker
and sse with a finalize(() => window.notifyFinalizedSecond()) for the second
connection (and notifyFinalizedFirst for the first), create two EventSource
instances in the page.evaluate, close the first (or call client.close() in the
handler for the first), and assert that the Promise for the second connection's
finalize has not resolved after the first closes (then finally close the second
and assert its finalize resolves); reference the existing symbols setupWorker,
sse, finalize, client.close and the exposed functions notifyFinalized* to locate
where to add this new test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c04fab05-4a10-4231-942a-2f89db1e3242

📥 Commits

Reviewing files that changed from the base of the PR and between 91a5d41 and f5bbf27.

📒 Files selected for processing (4)
  • src/core/handlers/RequestHandler.ts
  • src/core/sse.ts
  • src/mockServiceWorker.js
  • test/browser/sse-api/sse.finally.test.ts

Comment thread src/core/sse.ts
Comment on lines +200 to +211

protected async exhaustCleanups(
cleanups: Array<() => MaybePromise<void>>,
): Promise<void> {
const onClose = () => {
this.#emitter.removeListener('error', onClose)
this.#emitter.removeListener('close', onClose)
void super.exhaustCleanups(cleanups)
}

this.#emitter.once('error', onClose).once('close', onClose)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm the emitter is per-handler (single instance for all connections of a given sse route)
# and that there is no per-connection scoping when emitting 'close'/'error'.
rg -nP -C3 '#emitter\s*[:=]|kClientEmitter|new\s+Emitter<' --type=ts
rg -nP -C2 'emit\(\s*new\s+TypedEvent\(' --type=ts

Repository: mswjs/msw

Length of output: 3767


🏁 Script executed:

# Find where exhaustCleanups is called and the surrounding context
rg -nP -B5 -A5 'exhaustCleanups' src/core/sse.ts | head -100

# Check the resolver handler and finalize registration
rg -nP -B10 -A5 'runScheduledCleanups|finalize|resolver\(' src/core/sse.ts | head -150

Repository: mswjs/msw

Length of output: 895


🏁 Script executed:

# Find the parent class and runScheduledCleanups
rg -nP 'class HttpHandler|runScheduledCleanups|finalize' src/core/ --type=ts | head -50

# Check the full handler flow
rg -nP -B15 'await resolver' src/core/sse.ts | head -80

Repository: mswjs/msw

Length of output: 1799


🏁 Script executed:

# Get the full runScheduledCleanups implementation
sed -n '535,570p' src/core/handlers/RequestHandler.ts

Repository: mswjs/msw

Length of output: 971


🏁 Script executed:

# Check if there are any tests that demonstrate synchronous close or multiple connections
fd -e test.ts -e spec.ts src/ | xargs rg -l 'sse\|ServerSentEvent' | head -5

# Get the createEventStream function to see if there's any per-connection state
rg -nP -B5 -A20 'function createEventStream|const createEventStream' src/core/sse.ts

Repository: mswjs/msw

Length of output: 1183


🏁 Script executed:

# Look for test files related to sse
find src -type f -name '*.test.ts' -o -name '*.spec.ts' | xargs grep -l 'sse\|ServerSentEvent' 2>/dev/null | head -3

Repository: mswjs/msw

Length of output: 174


🏁 Script executed:

# Search for SSE-specific tests
find src -type f \( -name '*.test.ts' -o -name '*.spec.ts' \) | xargs grep -l 'ServerSentEventHandler\|http\.sse' 2>/dev/null

# Also check for any actual SSE handler usage examples
rg -l 'http\.sse\|sse\(' --type=ts src/ | grep -v node_modules

Repository: mswjs/msw

Length of output: 35


Cleanup wiring on the shared emitter cross-fires across connections and has a startup race.

Two correctness issues stem from registering cleanups on the per-handler #emitter:

  1. Multi-connection cross-fire. #emitter is shared by every connection that this sse(...) handler accepts. When connection A closes, the emitter emits close once and fires every once('close', onClose) listener registered so far — including the onClose captured for connection B's cleanups. B's clearInterval (or other per-connection cleanup from the issue #2630 example) will run while B is still streaming, dropping its background work.

  2. Resolver-synchronous close race. If a resolver synchronously calls client.close() after registering finalize(callback), the event fires before exhaustCleanups attaches listeners. The cleanup will never run because the 'close' event is emitted synchronously (line 336) while the resolver is still executing, before runScheduledCleanupsexhaustCleanups can attach the listener (line 210).

The cleanup binding must be per-connection. One option is to track close state on the client (the #closed deferred already exists) and pass a per-connection emitter or close-promise into the resolver context, running super.exhaustCleanups(cleanups) when that promise resolves — handling both "already closed" (run immediately) and "closes later" (await) cases.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/core/sse.ts` around lines 200 - 211, The current exhaustCleanups
registers listeners on the shared `#emitter` which causes cross-connection firing
and misses synchronous closes; change it to use a per-connection close signal
instead of `#emitter`: create a per-connection promise/emitter (or use the
existing `#closed` deferred tied to this client instance) and, inside
exhaustCleanups, if that per-connection closed is already resolved call
super.exhaustCleanups(cleanups) immediately, otherwise attach a single
per-connection listener (or await the per-connection close promise) to invoke
super.exhaustCleanups(cleanups) when that specific connection closes; remove
usage of the shared `#emitter` for per-connection cleanup wiring to avoid
cross-fire and race conditions when resolvers call client.close() synchronously.

Comment thread src/mockServiceWorker.js
@kettanaito kettanaito marked this pull request as draft May 7, 2026 20:14
@piotr-cz
Copy link
Copy Markdown
Contributor

piotr-cz commented May 8, 2026

Tested and works as expected:

The finalize callback is executed when sse stream is aborted:

  • when app aborts stream
  • with MSW's ServerSentEventClient.close()
  • with MSW's ServerSentEventClient.error()

@kettanaito kettanaito changed the title fix(sse): invoke finalize on response stream end feat(sse): invoke finalize on response stream end May 12, 2026
@kettanaito kettanaito added the BREAKING CHANGE Pull request introducing breaking changes. label May 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

BREAKING CHANGE Pull request introducing breaking changes.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support onCleanup callback to schedule side effect cleanup when resolvers are finished

2 participants