11import { afterEach , expect } from "bun:test"
2- import { Cause , Effect , Exit , Fiber , Layer } from "effect"
2+ import { Cause , Effect , Exit , Fiber , Layer , Queue } from "effect"
33import { Question } from "../../src/question"
44import { Instance } from "../../src/project/instance"
55import { InstanceRuntime } from "../../src/project/instance-runtime"
@@ -8,8 +8,11 @@ import { disposeAllInstances, provideInstance, reloadTestInstance, tmpdirScoped
88import { SessionID } from "../../src/session/schema"
99import { testEffect } from "../lib/effect"
1010import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
11+ import { Bus } from "../../src/bus"
1112
12- const it = testEffect ( Layer . mergeAll ( Question . defaultLayer , CrossSpawnSpawner . defaultLayer ) )
13+ const it = testEffect (
14+ Layer . mergeAll ( Question . layer . pipe ( Layer . provideMerge ( Bus . layer ) ) , CrossSpawnSpawner . defaultLayer ) ,
15+ )
1316
1417const askEffect = Effect . fn ( "QuestionTest.ask" ) ( function * ( input : {
1518 sessionID : SessionID
@@ -44,15 +47,19 @@ const rejectAll = Effect.gen(function* () {
4447 yield * Effect . forEach ( yield * listEffect , ( req ) => rejectEffect ( req . id ) , { discard : true } )
4548} )
4649
47- const waitForPending = ( count : number ) =>
48- Effect . gen ( function * ( ) {
49- for ( let i = 0 ; i < 100 ; i ++ ) {
50- const pending = yield * listEffect
51- if ( pending . length === count ) return pending
52- yield * Effect . sleep ( "10 millis" )
53- }
54- return yield * Effect . fail ( new Error ( `timed out waiting for ${ count } pending question request(s)` ) )
55- } )
50+ const waitForPending = Effect . fn ( "QuestionTest.waitForPending" ) ( function * ( count : number ) {
51+ const question = yield * Question . Service
52+ const bus = yield * Bus . Service
53+ const asked = yield * Queue . unbounded < void > ( )
54+ const off = yield * bus . subscribeCallback ( Question . Event . Asked , ( ) => Queue . offerUnsafe ( asked , undefined ) )
55+ yield * Effect . addFinalizer ( ( ) => Effect . sync ( off ) )
56+
57+ for ( ; ; ) {
58+ const pending = yield * question . list ( )
59+ if ( pending . length === count ) return pending
60+ yield * Queue . take ( asked ) . pipe ( Effect . timeout ( "2 seconds" ) )
61+ }
62+ } )
5663
5764it . instance (
5865 "ask - remains pending until answered" ,
0 commit comments