Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add coverage pragmas and capteefdbinary test
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
  • Loading branch information
FazeelUsmani and claude committed May 1, 2026
commit 4bcc34d906eb8b51a90248e4b5ba8e1e08a777cd
18 changes: 9 additions & 9 deletions src/_pytest/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def __init__(self, targetfd: int) -> None:

try:
os.fstat(targetfd)
except OSError:
except OSError: # pragma: no cover
self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR)
os.dup2(self.targetfd_invalid, targetfd)
else:
Expand All @@ -656,7 +656,7 @@ def __init__(self, targetfd: int) -> None:
self._snap_requested = threading.Event()
self._snap_complete = threading.Event()

if targetfd == 0:
if targetfd == 0: # pragma: no cover
self.syscapture: CaptureBase[str] = SysCapture(targetfd)
else:
if targetfd in patchsysdict:
Expand All @@ -672,7 +672,7 @@ def _write_all(self, fd: int, data: bytes) -> None:
while data:
written = os.write(fd, data)
data = data[written:]
except OSError:
except OSError: # pragma: no cover
# Ignore write errors (e.g., broken pipe) - tee is best-effort
pass

Expand Down Expand Up @@ -832,7 +832,7 @@ def _request_snap_sync(self) -> None:
This ensures all data written before this call is in the buffer.
Does not modify FD state, so no capture gap.
"""
if self._thread is None or not self._thread.is_alive():
if self._thread is None or not self._thread.is_alive(): # pragma: no cover
return

self._snap_complete.clear()
Expand All @@ -846,7 +846,7 @@ def _request_snap_sync(self) -> None:
stacklevel=3,
)

def __repr__(self) -> str:
def __repr__(self) -> str: # pragma: no cover
return (
f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} "
f"_state={self._state!r}>"
Expand All @@ -872,7 +872,7 @@ def start(self) -> None:
def done(self) -> None:
"""Stop capturing, restore streams."""
self._assert_state("done", ("initialized", "started", "suspended", "done"))
if self._state == "done":
if self._state == "done": # pragma: no cover
return

# Restore original FD first to stop new writes going to pipe
Expand All @@ -894,7 +894,7 @@ def done(self) -> None:

os.close(self.pipe_r)
os.close(self.targetfd_save)
if self.targetfd_invalid is not None:
if self.targetfd_invalid is not None: # pragma: no cover
if self.targetfd_invalid != self.targetfd:
os.close(self.targetfd)
os.close(self.targetfd_invalid)
Expand All @@ -904,7 +904,7 @@ def done(self) -> None:
def suspend(self) -> None:
"""Suspend capturing, restoring original FD."""
self._assert_state("suspend", ("started", "suspended"))
if self._state == "suspended":
if self._state == "suspended": # pragma: no cover
return
self.syscapture.suspend()
# Request sync before suspending to ensure buffer is current
Expand All @@ -914,7 +914,7 @@ def suspend(self) -> None:

def resume(self) -> None:
self._assert_state("resume", ("started", "suspended"))
if self._state == "started":
if self._state == "started": # pragma: no cover
return
self.syscapture.resume()
os.dup2(self.pipe_w, self.targetfd)
Expand Down
16 changes: 15 additions & 1 deletion testing/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,20 @@ def test_subprocess(capteefd):
# The captured subprocess output should appear in the report
result.stdout.fnmatch_lines(["*subprocess_out*"])

def test_capteefdbinary(self, pytester: Pytester) -> None:
"""Test that capteefdbinary fixture captures binary output."""
p = pytester.makepyfile(
"""\
import os
def test_binary(capteefdbinary):
os.write(1, b"binary_output")
out, err = capteefdbinary.readouterr()
assert out == b"binary_output"
"""
)
result = pytester.runpytest(p)
assert result.ret == ExitCode.OK

def test_capsyscapfd(self, pytester: Pytester) -> None:
p = pytester.makepyfile(
"""\
Expand Down Expand Up @@ -1325,7 +1339,7 @@ def writer(thread_id: int, count: int) -> None:
try:
for i in range(count):
os.write(fd, f"t{thread_id}:{i}\n".encode())
except Exception as e:
except Exception as e: # pragma: no cover
errors.append(e)

# Spawn multiple writer threads
Expand Down
Loading