Skip to content

Commit 567a911

Browse files
authored
refactor(session): simplify LLM stream by replacing queue with fromAsyncIterable (anomalyco#20324)
1 parent 434d82b commit 567a911

1 file changed

Lines changed: 5 additions & 15 deletions

File tree

  • packages/opencode/src/session

packages/opencode/src/session/llm.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,32 +53,22 @@ export namespace LLM {
5353
Effect.gen(function* () {
5454
return Service.of({
5555
stream(input) {
56-
const stream: Stream.Stream<Event, unknown> = Stream.scoped(
56+
return Stream.scoped(
5757
Stream.unwrap(
5858
Effect.gen(function* () {
5959
const ctrl = yield* Effect.acquireRelease(
6060
Effect.sync(() => new AbortController()),
6161
(ctrl) => Effect.sync(() => ctrl.abort()),
6262
)
63-
const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>()
6463

65-
yield* Effect.promise(async () => {
66-
const result = await LLM.stream({ ...input, abort: ctrl.signal })
67-
for await (const event of result.fullStream) {
68-
if (!Queue.offerUnsafe(queue, event)) break
69-
}
70-
Queue.endUnsafe(queue)
71-
}).pipe(
72-
Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))),
73-
Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())),
74-
Effect.forkScoped,
75-
)
64+
const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal }))
7665

77-
return Stream.fromQueue(queue)
66+
return Stream.fromAsyncIterable(result.fullStream, (e) =>
67+
e instanceof Error ? e : new Error(String(e)),
68+
)
7869
}),
7970
),
8071
)
81-
return stream
8272
},
8373
})
8474
}),

0 commit comments

Comments
 (0)