Skip to content

Commit cc222db

Browse files
author
Haider
committed
fix: replace timeout race with first-byte gate + add full edge coverage
Previous fix (a387257) replaced the unbounded `Bun.stdin.text()` call with a `Promise.race` against a 100ms timer. PR #935 reviewer flagged a silent stdin-drop regression; a consensus self-review found two more regressions introduced by the race: the orphaned read kept fd 0 open (`~1s` exit delay verified on Bun 1.3.11), and the whole-stream timeout silently truncated slow-but-legitimate piped producers. This commit replaces the race with a first-byte gate over `process.stdin` events. The timeout now caps "time until first readable byte," not full drain — so producers that flush within the window are not truncated, and the no-data path explicitly removes listeners and `unref`s the stream so an idle inherited fd no longer pins process exit. Other changes: - Accept socket-backed stdin (`isSocket()`) — was a silent narrowing. - Use `Pick<fs.Stats, ...>` instead of a hand-rolled `Stat` type. - Move helper from `src/cli/cmd/run-stdin.ts` to `src/util/stdin.ts`. - Extract `assembleStdinMessage` as a pure function so the positional + stdin concatenation case from PR #935 is unit-testable. - 4 spawn-based E2E tests for the actual fd-0 wedge / pipe paths.
1 parent a387257 commit cc222db

5 files changed

Lines changed: 424 additions & 10 deletions

File tree

packages/opencode/src/cli/cmd/run.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { TodoWriteTool } from "../../tool/todo"
2929
import { Locale } from "../../util/locale"
3030
import { Tracer, FileExporter, HttpExporter, type TraceExporter } from "../../altimate/observability/tracing"
3131
import { Config } from "../../config/config"
32+
import { readStdinIfAvailable, assembleStdinMessage } from "../../util/stdin"
3233

3334
type ToolProps<T extends Tool.Info> = {
3435
input: Tool.InferParameters<T>
@@ -430,16 +431,9 @@ export const RunCommand = cmd({
430431
message = [extractedParts.join("\n\n"), message].filter(Boolean).join("\n\n")
431432
}
432433

433-
// Only read stdin when no positional message was provided. The original
434-
// unconditional `!isTTY` guard matched a non-TTY parent process even when
435-
// it inherited (but never closed) stdin — `Bun.stdin.text()` would then
436-
// wait forever for an EOF that never arrives, producing a silent 0% CPU
437-
// hang for any subprocess / CI / agent caller that passed a positional
438-
// message. Positional-overrides-stdin matches conventional CLI semantics;
439-
// pipe-only invocations without a positional arg still work as before.
440-
if (!process.stdin.isTTY && message.trim().length === 0) {
441-
message += "\n" + (await Bun.stdin.text())
442-
}
434+
// Read piped/redirected stdin without wedging on inherited-but-idle fds.
435+
// See `src/util/stdin.ts` for the failure mode and the first-byte-race fix.
436+
message = assembleStdinMessage(message, await readStdinIfAvailable())
443437

444438
if (message.trim().length === 0 && !args.command) {
445439
UI.error("You must provide a message or a command")
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import fs from "fs"
2+
import type { Stats } from "fs"
3+
4+
const FIRST_BYTE_TIMEOUT_MS = 100
5+
6+
type Stat = Pick<Stats, "isFIFO" | "isFile" | "isSocket">
7+
8+
export interface ReadStdinDeps {
9+
isTTY?: boolean
10+
fstat?: () => Stat
11+
// Returns "" if no first byte arrives within timeoutMs; otherwise drains
12+
// stdin to EOF and returns the full content. Default implementation uses
13+
// process.stdin events so that a wedged-after-first-byte case still has a
14+
// well-defined behavior (waits for `end`); callers can inject a faster
15+
// implementation in tests.
16+
readStdin?: (timeoutMs: number) => Promise<string>
17+
timeoutMs?: number
18+
}
19+
20+
// Read piped/redirected stdin without wedging on an inherited-but-idle fd.
21+
//
22+
// The failure mode this guards against: subprocess callers (Claude Code's
23+
// Bash tool, Python `subprocess.run(..., stdin=None)`, CI, plugin hosts)
24+
// leave stdin attached to a parent pipe that is never written to and never
25+
// closed. A blind `Bun.stdin.text()` waits forever for an EOF that never
26+
// arrives.
27+
//
28+
// Strategy — two gates:
29+
//
30+
// 1. fstat gate: only FIFOs (pipes), regular files (redirects), and
31+
// sockets (process supervisors, socket activation, `nc -l`) can carry
32+
// real input. TTYs and character devices (e.g. `< /dev/null`) skip.
33+
//
34+
// 2. First-byte timeout: instead of bounding the whole-stream drain, we
35+
// wait up to `timeoutMs` for the first readable byte. If no byte
36+
// arrives in that window, we treat stdin as inherited-idle and skip.
37+
// If a byte arrives, we drain to EOF without further deadline — so a
38+
// slow producer that takes >100ms total but flushes its first chunk
39+
// within the window is not truncated. This avoids the two pitfalls of
40+
// a whole-stream race: (a) the orphaned `Bun.stdin.text()` continuing
41+
// to hold fd 0 open after the loser is abandoned, and (b) silent
42+
// mid-stream truncation of legitimate slow / large producers.
43+
export async function readStdinIfAvailable(deps: ReadStdinDeps = {}): Promise<string> {
44+
const isTTY = deps.isTTY ?? Boolean(process.stdin.isTTY)
45+
const fstat = deps.fstat ?? (() => fs.fstatSync(0) as Stat)
46+
const readStdin = deps.readStdin ?? defaultReadStdin
47+
const timeoutMs = deps.timeoutMs ?? FIRST_BYTE_TIMEOUT_MS
48+
49+
if (isTTY) return ""
50+
51+
try {
52+
const stat = fstat()
53+
if (!stat.isFIFO() && !stat.isFile() && !stat.isSocket()) return ""
54+
} catch {
55+
return ""
56+
}
57+
58+
return readStdin(timeoutMs)
59+
}
60+
61+
// Compose the final prompt from a positional message and stdin input.
62+
// Extracted as a pure function so the regression case from PR #935
63+
// (`echo ctx | run "prompt"` must concatenate, not silently drop ctx) can
64+
// be unit-tested without spawning the full run command.
65+
export function assembleStdinMessage(positional: string, stdinInput: string): string {
66+
if (stdinInput.trim().length === 0) return positional
67+
if (positional.length === 0) return stdinInput
68+
return positional + "\n" + stdinInput
69+
}
70+
71+
// Default implementation of the first-byte race over `process.stdin`.
72+
//
73+
// Why not `Bun.stdin.text()`: that reads the entire stream as a single
74+
// uncancellable Promise. If we race it against a timer and the timer wins,
75+
// the read still holds fd 0 open until the producer eventually closes,
76+
// blocking process exit (the original wedge moved to teardown).
77+
//
78+
// Using `process.stdin` event listeners lets us:
79+
// - bind the timeout to "first byte" rather than "full drain", so slow
80+
// producers and large payloads aren't truncated;
81+
// - cleanly remove our listeners and `unref` the stream on the no-data
82+
// path, so an inherited-open fd doesn't pin the event loop.
83+
function defaultReadStdin(timeoutMs: number): Promise<string> {
84+
return new Promise<string>((resolve) => {
85+
const stdin = process.stdin
86+
const chunks: Buffer[] = []
87+
let firstByteReceived = false
88+
let firstByteTimer: ReturnType<typeof setTimeout> | undefined
89+
let settled = false
90+
91+
const cleanup = () => {
92+
stdin.off("data", onData)
93+
stdin.off("end", onEnd)
94+
stdin.off("error", onError)
95+
if (firstByteTimer) clearTimeout(firstByteTimer)
96+
try {
97+
stdin.pause()
98+
} catch {}
99+
try {
100+
stdin.unref?.()
101+
} catch {}
102+
}
103+
104+
const settle = (result: string) => {
105+
if (settled) return
106+
settled = true
107+
cleanup()
108+
resolve(result)
109+
}
110+
111+
const onData = (chunk: Buffer) => {
112+
if (!firstByteReceived) {
113+
firstByteReceived = true
114+
if (firstByteTimer) {
115+
clearTimeout(firstByteTimer)
116+
firstByteTimer = undefined
117+
}
118+
}
119+
chunks.push(chunk)
120+
}
121+
const onEnd = () => settle(Buffer.concat(chunks).toString("utf8"))
122+
const onError = () => settle(Buffer.concat(chunks).toString("utf8"))
123+
124+
firstByteTimer = setTimeout(() => {
125+
if (!firstByteReceived) settle("")
126+
}, timeoutMs)
127+
128+
stdin.on("data", onData)
129+
stdin.on("end", onEnd)
130+
stdin.on("error", onError)
131+
try {
132+
stdin.resume()
133+
} catch {
134+
settle("")
135+
}
136+
})
137+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { describe, expect, test } from "bun:test"
2+
import path from "path"
3+
4+
// The fixture imports the real helper and prints { result, elapsed } as JSON.
5+
// Spawning it lets us exercise the actual `process.stdin` event path against
6+
// real fd 0 conditions — something dependency injection can't cover.
7+
const FIXTURE = path.join(__dirname, "stdin-fixture.ts")
8+
9+
type FileSink = { write(chunk: string | Uint8Array): number; end(): Promise<number> | number }
10+
11+
async function runFixture(opts: {
12+
writeStdin?: (sink: FileSink) => Promise<void>
13+
killAfterMs?: number
14+
}): Promise<{ code: number | null; result?: string; elapsed?: number; stdout: string; stderr: string }> {
15+
const proc = Bun.spawn(["bun", "run", FIXTURE], {
16+
stdin: "pipe",
17+
stdout: "pipe",
18+
stderr: "pipe",
19+
})
20+
21+
if (opts.writeStdin) {
22+
await opts.writeStdin(proc.stdin as unknown as FileSink)
23+
}
24+
25+
let killTimer: ReturnType<typeof setTimeout> | undefined
26+
if (opts.killAfterMs) {
27+
killTimer = setTimeout(() => proc.kill(), opts.killAfterMs)
28+
}
29+
const code = await proc.exited
30+
if (killTimer) clearTimeout(killTimer)
31+
32+
const stdout = await new Response(proc.stdout).text()
33+
const stderr = await new Response(proc.stderr).text()
34+
35+
try {
36+
const parsed = JSON.parse(stdout)
37+
return { code, result: parsed.result, elapsed: parsed.elapsed, stdout, stderr }
38+
} catch {
39+
return { code, stdout, stderr }
40+
}
41+
}
42+
43+
describe("readStdinIfAvailable (spawned subprocess)", () => {
44+
// M1-regression — the canonical wedge: an inherited pipe that's never
45+
// written to and never closed. Pre-fix this hung forever; with the
46+
// previous Promise.race fix, the await released at 100ms but fd 0 stayed
47+
// open until the parent closed. With the first-byte event race + unref,
48+
// the child exits promptly.
49+
test(
50+
"exits promptly with empty result when stdin is an inherited-but-idle pipe",
51+
async () => {
52+
const { code, result, elapsed } = await runFixture({
53+
// Don't write — leave the pipe open and silent. Close after 1s so
54+
// the parent's writer isn't garbage-collected; by then the child
55+
// should have already exited via the first-byte timeout.
56+
writeStdin: async (sink) => {
57+
await new Promise((r) => setTimeout(r, 1000))
58+
try {
59+
await sink.end()
60+
} catch {}
61+
},
62+
killAfterMs: 5000,
63+
})
64+
expect(code).toBe(0)
65+
expect(result).toBe("")
66+
expect(elapsed).toBeLessThan(500)
67+
},
68+
10000,
69+
)
70+
71+
// MAJOR-regression from PR #935: `echo ctx | run "prompt"` must still
72+
// deliver "ctx". Verified here by writing data + closing the pipe.
73+
test(
74+
"returns piped data when producer writes and closes",
75+
async () => {
76+
const { code, result } = await runFixture({
77+
writeStdin: async (sink) => {
78+
sink.write("context data")
79+
await sink.end()
80+
},
81+
})
82+
expect(code).toBe(0)
83+
expect(result).toBe("context data")
84+
},
85+
10000,
86+
)
87+
88+
// M2-regression: a producer that takes >100ms to flush first byte was
89+
// truncated by the old Promise.race timeout. The first-byte gate must
90+
// accept the byte once it arrives and then drain the rest without a
91+
// deadline.
92+
test(
93+
"preserves data from slow producer (first byte arrives just before timeout)",
94+
async () => {
95+
const { code, result } = await runFixture({
96+
writeStdin: async (sink) => {
97+
// Sleep close to but under the 100ms first-byte budget, then
98+
// flush. The whole-stream-race fix would have returned "".
99+
await new Promise((r) => setTimeout(r, 60))
100+
sink.write("slow ctx")
101+
await sink.end()
102+
},
103+
})
104+
expect(code).toBe(0)
105+
expect(result).toBe("slow ctx")
106+
},
107+
10000,
108+
)
109+
110+
// Negative control: a producer slow enough to miss the first-byte window
111+
// should yield "" (intentional cutoff, no truncation of in-flight data).
112+
test(
113+
"returns empty when first byte arrives after the first-byte timeout",
114+
async () => {
115+
const { code, result } = await runFixture({
116+
writeStdin: async (sink) => {
117+
await new Promise((r) => setTimeout(r, 400))
118+
try {
119+
sink.write("too late")
120+
await sink.end()
121+
} catch {}
122+
},
123+
})
124+
expect(code).toBe(0)
125+
expect(result).toBe("")
126+
},
127+
10000,
128+
)
129+
})
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Subprocess fixture used by stdin-e2e.test.ts.
2+
// Imports the real helper, invokes it against real fd 0, and prints the
3+
// result as JSON so the test can assert on it.
4+
import { readStdinIfAvailable } from "../../src/util/stdin"
5+
6+
const start = Date.now()
7+
const result = await readStdinIfAvailable()
8+
const elapsed = Date.now() - start
9+
process.stdout.write(JSON.stringify({ result, elapsed }))

0 commit comments

Comments
 (0)