Skip to content

fix(bus): acquire PubSub subscription eagerly to close /event race#27959

Open
kitlangton wants to merge 1 commit into
devfrom
worktree-sync-bridge-cleanup
Open

fix(bus): acquire PubSub subscription eagerly to close /event race#27959
kitlangton wants to merge 1 commit into
devfrom
worktree-sync-bridge-cleanup

Conversation

@kitlangton
Copy link
Copy Markdown
Contributor

Summary

bus.subscribe(def) and bus.subscribeAll() returned a Stream whose underlying PubSub.subscribe ran lazily on first pull (via Stream.unwrap). When a Stream was built in one place and consumed in another — e.g. the /event SSE handler returning the stream to be pumped by the response-body fiber — any publish in the hand-off window was silently lost.

The /event handler's Stream.concat(server.connected, events) shape made this concrete: events was subscribed after server.connected was flushed to the socket. SDK consumers that subscribe and immediately trigger a publish (sdk.part.update, etc.) hit this routinely. This is the root cause of the original `/event` regression that #27425 partially fixed.

Change

Replace `subscribe`/`subscribeAll` with their `Effect<Stream, never, Scope>` variants. The subscription is acquired eagerly at `yield*` time, lives in the caller's scope, and is released by the scope's finalizer. Any publish after the `yield*` is queued in the subscription regardless of when the consumer activates.

Why this shape: `PubSub.subscribe` in effect-smol is already `Effect<Subscription, never, Scope>`. Our wrapper was hiding that requirement behind `Stream.unwrap`, which made the scope dependency invisible to both the type system and the reader. Using the canonical primitive aligns with effect-smol's idiom and makes the time-sensitive nature of subscription explicit.

Migrations

  • `src/server/routes/instance/httpapi/handlers/event.ts`
  • `src/plugin/index.ts`
  • `src/project/project.ts`
  • `src/project/vcs.ts`
  • `src/share/share-next.ts`

All call sites are inside `Effect.gen(...)` with a Scope already in context (they all `forkScoped` or run in a layer effect), so the migration is mechanical: `bus.subscribe(def).pipe(...)` → `(yield* bus.subscribe(def)).pipe(...)`.

Tests

7 new failing tests on dev, all green after this change:

Bus-level (`test/bus/bus-effect.test.ts`)

  • `eager subscribe: publish after yield* is delivered without consumer-activation race`
  • `eager subscribeAll: publish after yield* is delivered`
  • `eager subscribe: Stream.concat(initial, subscribe) delivers publish during prefix` — the unit-level mirror of the `/event` shape

Handler-level (`test/server/httpapi-event-diagnostics.test.ts`)

  • D1–D7. D7 is the smallest reproducer: a no-op `AppRuntime.runPromise(Effect.void)` before opening `/event` was enough to expose the race on dev.

End-to-end (`test/server/httpapi-sdk.test.ts`)

  • `streams sync-backed part updates to /event subscribers` — full SDK → `/event` → `Session.updatePart` → `sync.run` → `bus.publish` → SDK consumer.

Test plan

  • `bun run test test/bus/bus-effect.test.ts` — green
  • `bun run test test/server/httpapi-event-diagnostics.test.ts` — green
  • `bun run test test/server/httpapi-event.test.ts` — green
  • `bun run test test/server/httpapi-sdk.test.ts -t "streams sync-backed"` — green
  • `bun run typecheck` — clean
  • Full `bun run test` — 2695 pass, 1 fail (pre-existing FileWatcher symlink test that also fails on dev without these changes; unrelated)

Related

`bus.subscribe(def)` and `bus.subscribeAll()` previously returned a
`Stream` whose underlying `PubSub.subscribe` ran lazily on first pull
(via `Stream.unwrap`). When a Stream was built in one place and consumed
in another — e.g. the `/event` SSE handler, which returns the stream
inside `HttpServerResponse.stream` for the body-pump fiber to consume
later — any publish in the hand-off window was lost.

The `/event` handler's `Stream.concat(server.connected, events)` shape
made this concrete: the events stream's PubSub.subscribe was only run
*after* `server.connected` was emitted and flushed to the socket, so
publishes that landed between the client receiving `server.connected`
and the body-pump fiber pulling from `events` were silently dropped.
SDK consumers (`client.event.subscribe()`, the Slack bot,
`opencode run --attach`) hit this routinely because they typically
subscribe and then trigger something that publishes (e.g.
`sdk.part.update`).

Change the bus interface so subscribe / subscribeAll return
`Effect<Stream, never, Scope>`. The subscription is now acquired
eagerly when the caller yields the effect, lives in the caller's
scope, and is released by the scope's finalizer. Any publish after
`yield*` is buffered into the subscription queue regardless of when
the consumer activates.

Migrated callers:
- /event handler (handlers/event.ts)
- plugin/index.ts
- project/project.ts
- project/vcs.ts
- share/share-next.ts

Regression tests added:
- test/bus/bus-effect.test.ts — 3 unit tests for eager subscribe,
  including the /event-shape Stream.concat pattern.
- test/server/httpapi-event-diagnostics.test.ts — 7 diagnostic
  tests (D1-D7) isolating each variable in the publisher chain. D7
  (no-op AppRuntime warmup) is the smallest regression trigger.
- test/server/httpapi-sdk.test.ts — end-to-end SDK subscription
  through /event during a sync.run-driven publish.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] SSE Event Stream Closes Immediately After Connection (Effect HTTP API Backend)

1 participant