forked from anomalyco/opencode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsse.ts
More file actions
66 lines (59 loc) · 1.71 KB
/
sse.ts
File metadata and controls
66 lines (59 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
export async function parseSSE(
body: ReadableStream<Uint8Array>,
signal: AbortSignal,
onEvent: (event: unknown) => void,
) {
const reader = body.getReader()
const decoder = new TextDecoder()
let buf = ""
let last = ""
let retry = 1000
const abort = () => {
void reader.cancel().catch(() => undefined)
}
signal.addEventListener("abort", abort)
try {
while (!signal.aborted) {
const chunk = await reader.read().catch(() => ({ done: true, value: undefined as Uint8Array | undefined }))
if (chunk.done) break
buf += decoder.decode(chunk.value, { stream: true })
buf = buf.replace(/\r\n/g, "\n").replace(/\r/g, "\n")
const chunks = buf.split("\n\n")
buf = chunks.pop() ?? ""
chunks.forEach((chunk) => {
const data: string[] = []
chunk.split("\n").forEach((line) => {
if (line.startsWith("data:")) {
data.push(line.replace(/^data:\s*/, ""))
return
}
if (line.startsWith("id:")) {
last = line.replace(/^id:\s*/, "")
return
}
if (line.startsWith("retry:")) {
const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
if (!Number.isNaN(parsed)) retry = parsed
}
})
if (!data.length) return
const raw = data.join("\n")
try {
onEvent(JSON.parse(raw))
} catch {
onEvent({
type: "sse.message",
properties: {
data: raw,
id: last || undefined,
retry,
},
})
}
})
}
} finally {
signal.removeEventListener("abort", abort)
reader.releaseLock()
}
}