fix(bus): acquire PubSub subscription eagerly to close /event race#27959
Open
kitlangton wants to merge 1 commit into
Open
fix(bus): acquire PubSub subscription eagerly to close /event race#27959kitlangton wants to merge 1 commit into
kitlangton wants to merge 1 commit into
Conversation
`bus.subscribe(def)` and `bus.subscribeAll()` previously returned a `Stream` whose underlying `PubSub.subscribe` ran lazily on first pull (via `Stream.unwrap`). When a Stream was built in one place and consumed in another — e.g. the `/event` SSE handler, which returns the stream inside `HttpServerResponse.stream` for the body-pump fiber to consume later — any publish in the hand-off window was lost. The `/event` handler's `Stream.concat(server.connected, events)` shape made this concrete: the events stream's PubSub.subscribe was only run *after* `server.connected` was emitted and flushed to the socket, so publishes that landed between the client receiving `server.connected` and the body-pump fiber pulling from `events` were silently dropped. SDK consumers (`client.event.subscribe()`, the Slack bot, `opencode run --attach`) hit this routinely because they typically subscribe and then trigger something that publishes (e.g. `sdk.part.update`). Change the bus interface so subscribe / subscribeAll return `Effect<Stream, never, Scope>`. The subscription is now acquired eagerly when the caller yields the effect, lives in the caller's scope, and is released by the scope's finalizer. Any publish after `yield*` is buffered into the subscription queue regardless of when the consumer activates. Migrated callers: - /event handler (handlers/event.ts) - plugin/index.ts - project/project.ts - project/vcs.ts - share/share-next.ts Regression tests added: - test/bus/bus-effect.test.ts — 3 unit tests for eager subscribe, including the /event-shape Stream.concat pattern. - test/server/httpapi-event-diagnostics.test.ts — 7 diagnostic tests (D1-D7) isolating each variable in the publisher chain. D7 (no-op AppRuntime warmup) is the smallest regression trigger. - test/server/httpapi-sdk.test.ts — end-to-end SDK subscription through /event during a sync.run-driven publish.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
bus.subscribe(def)andbus.subscribeAll()returned aStreamwhose underlyingPubSub.subscriberan lazily on first pull (viaStream.unwrap). When a Stream was built in one place and consumed in another — e.g. the/eventSSE handler returning the stream to be pumped by the response-body fiber — any publish in the hand-off window was silently lost.The
/eventhandler'sStream.concat(server.connected, events)shape made this concrete:eventswas subscribed afterserver.connectedwas flushed to the socket. SDK consumers that subscribe and immediately trigger a publish (sdk.part.update, etc.) hit this routinely. This is the root cause of the original `/event` regression that #27425 partially fixed.Change
Replace `subscribe`/`subscribeAll` with their `Effect<Stream, never, Scope>` variants. The subscription is acquired eagerly at `yield*` time, lives in the caller's scope, and is released by the scope's finalizer. Any publish after the `yield*` is queued in the subscription regardless of when the consumer activates.
Why this shape: `PubSub.subscribe` in effect-smol is already `Effect<Subscription, never, Scope>`. Our wrapper was hiding that requirement behind `Stream.unwrap`, which made the scope dependency invisible to both the type system and the reader. Using the canonical primitive aligns with effect-smol's idiom and makes the time-sensitive nature of subscription explicit.
Migrations
All call sites are inside `Effect.gen(...)` with a Scope already in context (they all `forkScoped` or run in a layer effect), so the migration is mechanical: `bus.subscribe(def).pipe(...)` → `(yield* bus.subscribe(def)).pipe(...)`.
Tests
7 new failing tests on dev, all green after this change:
Bus-level (`test/bus/bus-effect.test.ts`)
Handler-level (`test/server/httpapi-event-diagnostics.test.ts`)
End-to-end (`test/server/httpapi-sdk.test.ts`)
Test plan
Related