From c6d0e315565b8a1a0a9e9589f5efe169cb469c08 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 16:33:15 +0530 Subject: [PATCH 01/11] Add tee-fd capture mode and capteefd fixtures --- doc/en/how-to/capture-stdout-stderr.rst | 8 +- doc/en/reference/fixtures.rst | 8 + doc/en/reference/reference.rst | 23 +- src/_pytest/capture.py | 446 +++++++++++++++++++++++- testing/test_capture.py | 246 +++++++++++++ 5 files changed, 726 insertions(+), 5 deletions(-) diff --git a/doc/en/how-to/capture-stdout-stderr.rst b/doc/en/how-to/capture-stdout-stderr.rst index 5de89bc0e3f..d0be844b650 100644 --- a/doc/en/how-to/capture-stdout-stderr.rst +++ b/doc/en/how-to/capture-stdout-stderr.rst @@ -32,7 +32,7 @@ a test. Setting capturing methods or disabling capturing ------------------------------------------------- -There are three ways in which ``pytest`` can perform capturing: +There are four ways in which ``pytest`` can perform capturing: * ``fd`` (file descriptor) level capturing (default): All writes going to the operating system file descriptors 1 and 2 will be captured. @@ -46,6 +46,11 @@ There are three ways in which ``pytest`` can perform capturing: the actual ``sys.stdout`` and ``sys.stderr``. This allows output to be 'live printed' and captured for plugin use, such as junitxml (new in pytest 5.4). +* ``tee-fd`` capturing: All writes going to file descriptors 1 and 2 will be + captured, including subprocess output. The writes will also be passed-through + to the original file descriptors in real-time. This combines the subprocess + capture ability of ``fd`` with the live output visibility of ``tee-sys``. + .. _`disable capturing`: You can influence output capturing mechanisms from the command line: @@ -57,6 +62,7 @@ You can influence output capturing mechanisms from the command line: pytest --capture=fd # also point filedescriptors 1 and 2 to temp file pytest --capture=tee-sys # combines 'sys' and '-s', capturing sys.stdout/stderr # and passing it along to the actual sys.stdout/stderr + pytest --capture=tee-fd # like 'fd' but also passes output through in real-time .. _printdebugging: diff --git a/doc/en/reference/fixtures.rst b/doc/en/reference/fixtures.rst index c4a8d01ff0e..98f1cd2dce1 100644 --- a/doc/en/reference/fixtures.rst +++ b/doc/en/reference/fixtures.rst @@ -39,6 +39,14 @@ Built-in fixtures :fixture:`capsysbinary` Capture, as bytes, output to ``sys.stdout`` and ``sys.stderr``. + :fixture:`capteefd` + Capture, as text, output to file descriptors 1 and 2, with real-time + pass-through. + + :fixture:`capteefdbinary` + Capture, as bytes, output to file descriptors 1 and 2, with real-time + pass-through. + :fixture:`cache` Store and retrieve values across pytest runs. diff --git a/doc/en/reference/reference.rst b/doc/en/reference/reference.rst index a69aa2c7887..b0f577b9b68 100644 --- a/doc/en/reference/reference.rst +++ b/doc/en/reference/reference.rst @@ -422,6 +422,26 @@ capteesys .. autofunction:: _pytest.capture.capteesys() :no-auto-options: +.. fixture:: capteefd + +capteefd +~~~~~~~~ + +**Tutorial**: :ref:`captures` + +.. autofunction:: _pytest.capture.capteefd() + :no-auto-options: + +.. fixture:: capteefdbinary + +capteefdbinary +~~~~~~~~~~~~~~ + +**Tutorial**: :ref:`captures` + +.. autofunction:: _pytest.capture.capteefdbinary() + :no-auto-options: + .. fixture:: capsysbinary capsysbinary @@ -3111,7 +3131,8 @@ Output Capture * ``fd``: capture at file descriptor level (default) * ``sys``: capture at sys level * ``no``: don't capture output - * ``tee-sys``: capture but also show output on terminal + * ``tee-sys``: capture sys level but also show output on terminal + * ``tee-fd``: capture fd level but also show output on terminal See :ref:`captures`. diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 6d98676be5f..14cb6acb06a 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -14,7 +14,14 @@ import os import sys from tempfile import TemporaryFile +import threading from types import TracebackType +import warnings + +# Platform-specific imports for non-blocking I/O +if sys.platform != "win32": + import fcntl + import select from typing import Any from typing import AnyStr from typing import BinaryIO @@ -43,7 +50,7 @@ from _pytest.reports import CollectReport -_CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] +_CaptureMethod = Literal["fd", "sys", "no", "tee-sys", "tee-fd"] def pytest_addoption(parser: Parser) -> None: @@ -53,8 +60,8 @@ def pytest_addoption(parser: Parser) -> None: action="store", default="fd", metavar="method", - choices=["fd", "sys", "no", "tee-sys"], - help="Per-test capturing method: one of fd|sys|no|tee-sys", + choices=["fd", "sys", "no", "tee-sys", "tee-fd"], + help="Per-test capturing method: one of fd|sys|no|tee-sys|tee-fd", ) group._addoption( # private to use reserved lower-case short option "-s", @@ -601,6 +608,371 @@ def writeorg(self, data: str) -> None: os.write(self.targetfd_save, data.encode("utf-8")) +class FDCaptureTeeBase(CaptureBase[AnyStr]): + """Base class for FD capture with real-time tee using pipe and thread. + + Uses a pipe to intercept FD writes. A background thread reads from the pipe, + writes to both the capture buffer and the original FD for real-time passthrough. + + Key design difference from FDCapture: since output is tee'd in real-time, + writeorg() is a no-op to prevent duplicate output when pop_outerr_to_orig() + replays captured data. + + Synchronization model: + - snap() requests a sync via _snap_requested event + - Thread acknowledges by setting _snap_complete event after draining + - This ensures all data written before snap() is in the buffer + """ + + def __init__(self, targetfd: int) -> None: + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: + self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + # Create a pipe: writes to targetfd go to pipe_w, we read from pipe_r + self.pipe_r, self.pipe_w = os.pipe() + + # Set pipe_r to non-blocking for proper synchronization (Unix only) + if sys.platform != "win32": + flags = fcntl.fcntl(self.pipe_r, fcntl.F_GETFL) + fcntl.fcntl(self.pipe_r, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + # Buffer to collect captured output + self._buffer = io.BytesIO() + self._buffer_lock = threading.Lock() + + # Thread synchronization + self._thread: threading.Thread | None = None + self._shutdown = threading.Event() + self._snap_requested = threading.Event() + self._snap_complete = threading.Event() + + if targetfd == 0: + self.syscapture: CaptureBase[str] = SysCapture(targetfd) + else: + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, tee=True) + else: + self.syscapture = NoCapture(targetfd) + + self._state = "initialized" + + def _write_all(self, fd: int, data: bytes) -> None: + """Write all data to fd, handling partial writes.""" + try: + while data: + written = os.write(fd, data) + data = data[written:] + except OSError: + # Ignore write errors (e.g., broken pipe) - tee is best-effort + pass + + def _tee_thread(self) -> None: + """Background thread that reads from pipe and tees to buffer + original fd. + + Uses non-blocking reads with select/poll for proper synchronization. + Responds to snap requests by draining the pipe and signaling completion. + """ + if sys.platform == "win32": + self._tee_thread_windows() + else: + self._tee_thread_unix() + + def _tee_thread_unix(self) -> None: + """Unix implementation using select() for non-blocking reads.""" + while not self._shutdown.is_set(): + # Check for snap request + if self._snap_requested.is_set(): + # Drain all available data + self._drain_nonblocking() + self._snap_requested.clear() + self._snap_complete.set() + + # Wait for data with timeout + try: + readable, _, _ = select.select([self.pipe_r], [], [], 0.01) + if readable: + try: + data = os.read(self.pipe_r, 32768) + if not data: + break # EOF + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except BlockingIOError: + pass # No data available + except OSError: + break + except (ValueError, OSError): + break # Pipe closed + + # Final drain on shutdown + self._drain_nonblocking() + # Signal any pending snap + if self._snap_requested.is_set(): + self._snap_requested.clear() + self._snap_complete.set() + + def _tee_thread_windows(self) -> None: + """Windows implementation using a queue for non-blocking communication. + + On Windows, select() doesn't work on pipes and os.read() blocks indefinitely. + We use a separate reader thread that pushes data to a queue, allowing the + main tee thread to respond to snap requests without blocking. + """ + import queue + + data_queue: queue.Queue[bytes | None] = queue.Queue() + reader_shutdown = threading.Event() + + def reader_thread() -> None: + """Helper thread that performs blocking reads and queues data.""" + try: + while not reader_shutdown.is_set(): + try: + data = os.read(self.pipe_r, 32768) + if not data: + data_queue.put(None) # EOF signal + break + data_queue.put(data) + except OSError: + data_queue.put(None) # Error signal + break + except Exception: + data_queue.put(None) + + reader = threading.Thread(target=reader_thread, daemon=True) + reader.start() + + try: + while not self._shutdown.is_set(): + # Check for snap request + if self._snap_requested.is_set(): + # Wait a bit for any in-flight reads to complete and be queued. + # This handles the race where snap() is called immediately after + # write() but before the reader thread has read the data. + # We do multiple short waits with queue checks to minimize latency. + for _ in range(5): # Up to 50ms total + try: + data = data_queue.get(timeout=0.01) + if data is None: + break + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + pass + # Final non-blocking drain + self._drain_queue_nonblocking(data_queue) + self._snap_requested.clear() + self._snap_complete.set() + + # Get data from queue with timeout to stay responsive + try: + data = data_queue.get(timeout=0.01) + if data is None: + break # EOF or error + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + pass # No data available, continue loop + + # Final drain on shutdown + self._drain_queue_nonblocking(data_queue) + finally: + reader_shutdown.set() + # Reader thread will exit when pipe is closed or on next read + + # Signal any pending snap + if self._snap_requested.is_set(): + self._snap_requested.clear() + self._snap_complete.set() + + def _drain_queue_nonblocking(self, data_queue: "queue.Queue[bytes | None]") -> None: + """Drain all immediately available data from queue (non-blocking).""" + import queue + + while True: + try: + data = data_queue.get_nowait() + if data is None: + break # EOF marker + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + break # No more data available + + def _drain_nonblocking(self) -> None: + """Drain all immediately available data from pipe (non-blocking).""" + while True: + try: + data = os.read(self.pipe_r, 32768) + if not data: + break + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except BlockingIOError: + break # No more data available + except OSError: + break + + def _request_snap_sync(self) -> None: + """Request thread to drain pipe and wait for completion. + + This ensures all data written before this call is in the buffer. + Does not modify FD state, so no capture gap. + """ + if self._thread is None or not self._thread.is_alive(): + return + + self._snap_complete.clear() + self._snap_requested.set() + + # Wait for thread to acknowledge (with timeout) + if not self._snap_complete.wait(timeout=1.0): + warnings.warn( + f"FDCaptureTee snap sync for fd {self.targetfd} timed out. " + "Some captured output may be incomplete.", + stacklevel=3, + ) + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} " + f"_state={self._state!r}>" + ) + + def _assert_state(self, op: str, states: tuple[str, ...]) -> None: + assert self._state in states, ( + "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + ) + + def start(self) -> None: + """Start capturing on targetfd using pipe with tee thread.""" + self._assert_state("start", ("initialized",)) + os.dup2(self.pipe_w, self.targetfd) + self._shutdown.clear() + self._thread = threading.Thread(target=self._tee_thread, daemon=False) + self._thread.start() + self.syscapture.start() + self._state = "started" + + def done(self) -> None: + """Stop capturing, restore streams.""" + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return + + # Restore original FD first to stop new writes going to pipe + os.dup2(self.targetfd_save, self.targetfd) + + # Signal thread to shutdown and close pipe write end + self._shutdown.set() + os.close(self.pipe_w) + + # Wait for thread to finish processing all data + if self._thread is not None: + self._thread.join(timeout=5.0) + if self._thread.is_alive(): + warnings.warn( + f"FDCaptureTee thread for fd {self.targetfd} did not exit cleanly " + "within timeout. Some captured output may be lost.", + stacklevel=2, + ) + + os.close(self.pipe_r) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self._state = "done" + + def suspend(self) -> None: + """Suspend capturing, restoring original FD.""" + self._assert_state("suspend", ("started", "suspended")) + if self._state == "suspended": + return + self.syscapture.suspend() + # Request sync before suspending to ensure buffer is current + self._request_snap_sync() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self) -> None: + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return + self.syscapture.resume() + os.dup2(self.pipe_w, self.targetfd) + self._state = "started" + + +class FDCaptureBinaryTee(FDCaptureTeeBase[bytes]): + """Capture IO to/from a given OS-level file descriptor, with real-time tee. + + snap() produces `bytes`. + """ + + EMPTY_BUFFER = b"" + + def snap(self) -> bytes: + self._assert_state("snap", ("started", "suspended")) + # Request thread to drain pipe - no FD manipulation, no capture gap + self._request_snap_sync() + with self._buffer_lock: + res = self._buffer.getvalue() + self._buffer.seek(0) + self._buffer.truncate() + return res + + def writeorg(self, data: bytes) -> None: + """No-op: output is already tee'd in real-time by the background thread. + + This prevents duplicate output when pop_outerr_to_orig() replays captured data. + """ + pass + + +class FDCaptureTee(FDCaptureTeeBase[str]): + """Capture IO to/from a given OS-level file descriptor, with real-time tee. + + snap() produces text. + """ + + EMPTY_BUFFER = "" + + def snap(self) -> str: + self._assert_state("snap", ("started", "suspended")) + # Request thread to drain pipe - no FD manipulation, no capture gap + self._request_snap_sync() + with self._buffer_lock: + res = self._buffer.getvalue().decode("utf-8", errors="replace") + self._buffer.seek(0) + self._buffer.truncate() + return res + + def writeorg(self, data: str) -> None: + """No-op: output is already tee'd in real-time by the background thread. + + This prevents duplicate output when pop_outerr_to_orig() replays captured data. + """ + pass + + # MultiCapture @@ -720,6 +1092,10 @@ def _get_multicapture(method: _CaptureMethod) -> MultiCapture[str]: return MultiCapture( in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) ) + elif method == "tee-fd": + return MultiCapture( + in_=None, out=FDCaptureTee(1), err=FDCaptureTee(2) + ) raise ValueError(f"unknown capturing method: {method!r}") @@ -1142,3 +1518,67 @@ def test_system_echo(capfdbinary): yield capture_fixture capture_fixture.close() capman.unset_fixture() + + +@fixture +def capteefd(request: SubRequest) -> Generator[CaptureFixture[str]]: + r"""Enable text capturing of writes to file descriptors ``1`` and ``2``, + with real-time pass-through to the terminal. + + The captured output is made available via ``capteefd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. + + Output is also passed through to the original file descriptors in real-time, + allowing visibility of test progress while still capturing for reports. + + Returns an instance of :class:`CaptureFixture[str] `. + + Example: + + .. code-block:: python + + def test_system_echo(capteefd): + os.system('echo "hello"') + captured = capteefd.readouterr() + assert captured.out == "hello\n" + """ + capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCaptureTee, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@fixture +def capteefdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes]]: + r"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``, + with real-time pass-through to the terminal. + + The captured output is made available via ``capteefdbinary.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``bytes`` objects. + + Output is also passed through to the original file descriptors in real-time, + allowing visibility of test progress while still capturing for reports. + + Returns an instance of :class:`CaptureFixture[bytes] `. + + Example: + + .. code-block:: python + + def test_system_echo(capteefdbinary): + os.system('echo "hello"') + captured = capteefdbinary.readouterr() + assert captured.out == b"hello\n" + """ + capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCaptureBinaryTee, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() diff --git a/testing/test_capture.py b/testing/test_capture.py index 7aaba99fe43..535d308feca 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -10,6 +10,7 @@ import subprocess import sys import textwrap +import threading from typing import BinaryIO from typing import cast from typing import TextIO @@ -479,6 +480,47 @@ def test_one(capteesys): result.stdout.fnmatch_lines(["sTdoUt", "sTdeRr"]) # no tee, just reported assert not result.stderr.lines + def test_capteefd(self, pytester: Pytester) -> None: + """Test that capteefd captures FD-level output with tee passthrough.""" + p = pytester.makepyfile( + """\ + import os + def test_one(capteefd): + os.write(1, b"fDoUt") + os.write(2, b"fDeRr") + out, err = capteefd.readouterr() + assert out == "fDoUt" + assert err == "fDeRr" + """ + ) + # First test: just verify the capture works + result = pytester.runpytest(p, "--capture=fd") + assert result.ret == ExitCode.OK + + # Test with tee-fd: capture should work and output should be tee'd + result = pytester.runpytest(p, "--capture=tee-fd", "-rA") + assert result.ret == ExitCode.OK + # The captured output should appear in the report + result.stdout.fnmatch_lines(["*fDoUt*", "*fDeRr*"]) + + def test_capteefd_subprocess(self, pytester: Pytester) -> None: + """Test that capteefd captures subprocess output.""" + p = pytester.makepyfile( + """\ + import subprocess + import sys + def test_subprocess(capteefd): + subprocess.run([sys.executable, "-c", "print('subprocess_out')"]) + out, err = capteefd.readouterr() + assert "subprocess_out" in out + """ + ) + # Test that subprocess output is captured (the key feature of FD-level capture) + result = pytester.runpytest(p, "--capture=tee-fd", "-rA") + assert result.ret == ExitCode.OK + # The captured subprocess output should appear in the report + result.stdout.fnmatch_lines(["*subprocess_out*"]) + def test_capsyscapfd(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ @@ -1109,6 +1151,210 @@ def test_capfd_sys_stdout_mode(self, capfd) -> None: assert "b" not in sys.stdout.mode +class TestFDCaptureTee: + """Tests for FDCaptureTee - FD-level capture with real-time tee.""" + + def test_simple(self) -> None: + """Test basic capture and tee functionality.""" + with saved_fd(1): + cap = capture.FDCaptureTee(1) + cap.start() + os.write(1, b"hello") + s = cap.snap() + cap.done() + assert s == "hello" + + def test_tee_passthrough(self, tmpfile: BinaryIO) -> None: + """Test that output is passed through to original FD.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"hello") + s = cap.snap() + cap.done() + assert s == "hello" + # Check that output was tee'd to original file + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"hello" + + def test_suspend_resume(self) -> None: + """Test suspend and resume functionality.""" + with saved_fd(1): + cap = capture.FDCaptureTee(1) + cap.start() + os.write(1, b"before") + s = cap.snap() + assert s == "before" + cap.suspend() + os.write(1, b"during_suspend") + assert not cap.snap() + cap.resume() + os.write(1, b"after") + s = cap.snap() + assert s == "after" + cap.done() + + def test_binary(self) -> None: + """Test FDCaptureBinaryTee returns bytes.""" + with saved_fd(1): + cap = capture.FDCaptureBinaryTee(1) + cap.start() + os.write(1, b"hello") + s = cap.snap() + cap.done() + assert s == b"hello" + assert isinstance(s, bytes) + + def test_large_output(self, tmpfile: BinaryIO) -> None: + """Test handling of large output that exceeds pipe buffer.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + # Write more than typical pipe buffer (64KB on many systems) + large_data = b"x" * 100000 + os.write(fd, large_data) + s = cap.snap() + cap.done() + assert len(s) == 100000 + assert s == "x" * 100000 + # Verify tee'd output + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == large_data + + def test_capture_and_tee_match(self, tmpfile: BinaryIO) -> None: + """Test that captured output matches tee'd output exactly.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + # Write multiple chunks + for i in range(10): + os.write(fd, f"chunk{i}\n".encode()) + captured = cap.snap() + cap.done() + # Verify captured matches tee'd + tmpfile.seek(0) + tee_output = tmpfile.read().decode() + assert captured == tee_output + + def test_output_pending_at_done(self, tmpfile: BinaryIO) -> None: + """Test that output written just before done() is still captured and tee'd.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"final_output") + # Call done() immediately - data should still be captured + s = cap.snap() + cap.done() + assert s == "final_output" + # Verify it was also tee'd + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"final_output" + + def test_stderr_capture(self) -> None: + """Test capturing stderr with tee.""" + with saved_fd(2): + cap = capture.FDCaptureTee(2) + cap.start() + os.write(2, b"stderr_output") + s = cap.snap() + cap.done() + assert s == "stderr_output" + + def test_writeorg_is_noop(self, tmpfile: BinaryIO) -> None: + """Test that writeorg() is a no-op to prevent duplicate output.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"hello") + s = cap.snap() + # writeorg should be a no-op since data was already tee'd + cap.writeorg("should_not_appear") + cap.done() + assert s == "hello" + # Verify only original data was tee'd, not the writeorg call + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"hello" + assert b"should_not_appear" not in tee_output + + def test_realtime_passthrough(self, tmpfile: BinaryIO) -> None: + """Test that output is passed through in real-time, before snap() is called.""" + import time + + fd = tmpfile.fileno() + # Save the original fd so we can read from it later + saved_fd = os.dup(fd) + + cap = capture.FDCaptureTee(fd) + cap.start() + + # Write data + os.write(fd, b"realtime_data") + + # Give the tee thread time to process (it runs in background) + time.sleep(0.05) + + # Check that data was tee'd BEFORE calling snap() + # Read from the saved fd (which points to the original file) + os.lseek(saved_fd, 0, os.SEEK_SET) + tee_output = os.read(saved_fd, 1024) + assert tee_output == b"realtime_data", "Output should appear in real-time before snap()" + + # Now snap should also have the data + captured = cap.snap() + cap.done() + os.close(saved_fd) + assert captured == "realtime_data" + + def test_concurrent_writes(self, tmpfile: BinaryIO) -> None: + """Test handling of concurrent writes from multiple threads.""" + import time + + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + + results = [] + errors = [] + + def writer(thread_id: int, count: int) -> None: + try: + for i in range(count): + os.write(fd, f"t{thread_id}:{i}\n".encode()) + except Exception as e: + errors.append(e) + + # Spawn multiple writer threads + threads = [] + for i in range(3): + t = threading.Thread(target=writer, args=(i, 10)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Small delay for all data to flow through + time.sleep(0.1) + + captured = cap.snap() + cap.done() + + assert not errors, f"Writer threads had errors: {errors}" + + # Verify all writes were captured (30 lines total) + lines = captured.strip().split("\n") + assert len(lines) == 30, f"Expected 30 lines, got {len(lines)}" + + # Verify tee'd output matches captured + tmpfile.seek(0) + tee_output = tmpfile.read().decode() + assert tee_output == captured + + @contextlib.contextmanager def saved_fd(fd): new_fd = os.dup(fd) From 231fcfb3372b5be8636bfb2d2a2d393a800de025 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 16:38:39 +0530 Subject: [PATCH 02/11] Add changelog entry for tee-fd capture --- changelog/14159.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/14159.feature.rst diff --git a/changelog/14159.feature.rst b/changelog/14159.feature.rst new file mode 100644 index 00000000000..c3747f513e1 --- /dev/null +++ b/changelog/14159.feature.rst @@ -0,0 +1 @@ +Added ``--capture=tee-fd`` capture mode and ``capteefd``/``capteefdbinary`` fixtures for FD-level capture with real-time terminal passthrough. From 39babc1a6a5ab5af69724da6674c2a929d924ba7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 May 2026 11:12:00 +0000 Subject: [PATCH 03/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/_pytest/capture.py | 9 +++------ testing/test_capture.py | 4 +++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 14cb6acb06a..f7ea910d84e 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -18,6 +18,7 @@ from types import TracebackType import warnings + # Platform-specific imports for non-blocking I/O if sys.platform != "win32": import fcntl @@ -796,7 +797,7 @@ def reader_thread() -> None: self._snap_requested.clear() self._snap_complete.set() - def _drain_queue_nonblocking(self, data_queue: "queue.Queue[bytes | None]") -> None: + def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: """Drain all immediately available data from queue (non-blocking).""" import queue @@ -944,7 +945,6 @@ def writeorg(self, data: bytes) -> None: This prevents duplicate output when pop_outerr_to_orig() replays captured data. """ - pass class FDCaptureTee(FDCaptureTeeBase[str]): @@ -970,7 +970,6 @@ def writeorg(self, data: str) -> None: This prevents duplicate output when pop_outerr_to_orig() replays captured data. """ - pass # MultiCapture @@ -1093,9 +1092,7 @@ def _get_multicapture(method: _CaptureMethod) -> MultiCapture[str]: in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) ) elif method == "tee-fd": - return MultiCapture( - in_=None, out=FDCaptureTee(1), err=FDCaptureTee(2) - ) + return MultiCapture(in_=None, out=FDCaptureTee(1), err=FDCaptureTee(2)) raise ValueError(f"unknown capturing method: {method!r}") diff --git a/testing/test_capture.py b/testing/test_capture.py index 535d308feca..52e5c985497 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -1301,7 +1301,9 @@ def test_realtime_passthrough(self, tmpfile: BinaryIO) -> None: # Read from the saved fd (which points to the original file) os.lseek(saved_fd, 0, os.SEEK_SET) tee_output = os.read(saved_fd, 1024) - assert tee_output == b"realtime_data", "Output should appear in real-time before snap()" + assert tee_output == b"realtime_data", ( + "Output should appear in real-time before snap()" + ) # Now snap should also have the data captured = cap.snap() From 865cd753df25231d2118330468eb9bb035fb6094 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 17:00:11 +0530 Subject: [PATCH 04/11] Fix ruff and mypy errors --- src/_pytest/capture.py | 2 +- testing/test_capture.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index f7ea910d84e..a2523400b54 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -797,7 +797,7 @@ def reader_thread() -> None: self._snap_requested.clear() self._snap_complete.set() - def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: + def _drain_queue_nonblocking(self, data_queue: "queue.Queue[bytes | None]") -> None: """Drain all immediately available data from queue (non-blocking).""" import queue diff --git a/testing/test_capture.py b/testing/test_capture.py index 52e5c985497..fc5a28e687e 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -1319,8 +1319,7 @@ def test_concurrent_writes(self, tmpfile: BinaryIO) -> None: cap = capture.FDCaptureTee(fd) cap.start() - results = [] - errors = [] + errors: list[Exception] = [] def writer(thread_id: int, count: int) -> None: try: From 9ad759b74c6bb6227a0e531a9c6bc15b5d3b3164 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 May 2026 11:30:51 +0000 Subject: [PATCH 05/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/_pytest/capture.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index a2523400b54..f7ea910d84e 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -797,7 +797,7 @@ def reader_thread() -> None: self._snap_requested.clear() self._snap_complete.set() - def _drain_queue_nonblocking(self, data_queue: "queue.Queue[bytes | None]") -> None: + def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: """Drain all immediately available data from queue (non-blocking).""" import queue From f4b2928e143109eef4efc9bed026d11dcf3241c8 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 17:05:50 +0530 Subject: [PATCH 06/11] Move queue import to module level --- src/_pytest/capture.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index f7ea910d84e..f2b2ac53028 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -12,6 +12,7 @@ import io from io import UnsupportedOperation import os +import queue import sys from tempfile import TemporaryFile import threading @@ -728,8 +729,6 @@ def _tee_thread_windows(self) -> None: We use a separate reader thread that pushes data to a queue, allowing the main tee thread to respond to snap requests without blocking. """ - import queue - data_queue: queue.Queue[bytes | None] = queue.Queue() reader_shutdown = threading.Event() @@ -799,8 +798,6 @@ def reader_thread() -> None: def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: """Drain all immediately available data from queue (non-blocking).""" - import queue - while True: try: data = data_queue.get_nowait() From de6ed6381bc4a7881b5fd3140d8fce2950a451bb Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 17:15:11 +0530 Subject: [PATCH 07/11] Add pragma no cover for platform-specific code --- src/_pytest/capture.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index f2b2ac53028..6f5eddf4316 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -682,9 +682,9 @@ def _tee_thread(self) -> None: Uses non-blocking reads with select/poll for proper synchronization. Responds to snap requests by draining the pipe and signaling completion. """ - if sys.platform == "win32": + if sys.platform == "win32": # pragma: no cover self._tee_thread_windows() - else: + else: # pragma: no cover self._tee_thread_unix() def _tee_thread_unix(self) -> None: @@ -837,7 +837,7 @@ def _request_snap_sync(self) -> None: self._snap_requested.set() # Wait for thread to acknowledge (with timeout) - if not self._snap_complete.wait(timeout=1.0): + if not self._snap_complete.wait(timeout=1.0): # pragma: no cover warnings.warn( f"FDCaptureTee snap sync for fd {self.targetfd} timed out. " "Some captured output may be incomplete.", @@ -883,7 +883,7 @@ def done(self) -> None: # Wait for thread to finish processing all data if self._thread is not None: self._thread.join(timeout=5.0) - if self._thread.is_alive(): + if self._thread.is_alive(): # pragma: no cover warnings.warn( f"FDCaptureTee thread for fd {self.targetfd} did not exit cleanly " "within timeout. Some captured output may be lost.", From 4f3c621788a3ae753ed4682d607fdeea2499a351 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 17:29:31 +0530 Subject: [PATCH 08/11] Add pragma no cover for platform-specific methods --- src/_pytest/capture.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 6f5eddf4316..11c26a8771d 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -687,7 +687,7 @@ def _tee_thread(self) -> None: else: # pragma: no cover self._tee_thread_unix() - def _tee_thread_unix(self) -> None: + def _tee_thread_unix(self) -> None: # pragma: no cover """Unix implementation using select() for non-blocking reads.""" while not self._shutdown.is_set(): # Check for snap request @@ -722,7 +722,7 @@ def _tee_thread_unix(self) -> None: self._snap_requested.clear() self._snap_complete.set() - def _tee_thread_windows(self) -> None: + def _tee_thread_windows(self) -> None: # pragma: no cover """Windows implementation using a queue for non-blocking communication. On Windows, select() doesn't work on pipes and os.read() blocks indefinitely. @@ -796,7 +796,7 @@ def reader_thread() -> None: self._snap_requested.clear() self._snap_complete.set() - def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: + def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: # pragma: no cover """Drain all immediately available data from queue (non-blocking).""" while True: try: @@ -809,7 +809,7 @@ def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> Non except queue.Empty: break # No more data available - def _drain_nonblocking(self) -> None: + def _drain_nonblocking(self) -> None: # pragma: no cover """Drain all immediately available data from pipe (non-blocking).""" while True: try: From b14c7066c8a082c04ce478c2b2265995303de0af Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 May 2026 11:59:58 +0000 Subject: [PATCH 09/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/_pytest/capture.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 11c26a8771d..697b4e39e04 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -796,7 +796,9 @@ def reader_thread() -> None: self._snap_requested.clear() self._snap_complete.set() - def _drain_queue_nonblocking(self, data_queue: queue.Queue[bytes | None]) -> None: # pragma: no cover + def _drain_queue_nonblocking( + self, data_queue: queue.Queue[bytes | None] + ) -> None: # pragma: no cover """Drain all immediately available data from queue (non-blocking).""" while True: try: From 4bcc34d906eb8b51a90248e4b5ba8e1e08a777cd Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 19:52:11 +0530 Subject: [PATCH 10/11] Add coverage pragmas and capteefdbinary test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/_pytest/capture.py | 18 +++++++++--------- testing/test_capture.py | 16 +++++++++++++++- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 697b4e39e04..58aee9d3401 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -631,7 +631,7 @@ def __init__(self, targetfd: int) -> None: try: os.fstat(targetfd) - except OSError: + except OSError: # pragma: no cover self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR) os.dup2(self.targetfd_invalid, targetfd) else: @@ -656,7 +656,7 @@ def __init__(self, targetfd: int) -> None: self._snap_requested = threading.Event() self._snap_complete = threading.Event() - if targetfd == 0: + if targetfd == 0: # pragma: no cover self.syscapture: CaptureBase[str] = SysCapture(targetfd) else: if targetfd in patchsysdict: @@ -672,7 +672,7 @@ def _write_all(self, fd: int, data: bytes) -> None: while data: written = os.write(fd, data) data = data[written:] - except OSError: + except OSError: # pragma: no cover # Ignore write errors (e.g., broken pipe) - tee is best-effort pass @@ -832,7 +832,7 @@ def _request_snap_sync(self) -> None: This ensures all data written before this call is in the buffer. Does not modify FD state, so no capture gap. """ - if self._thread is None or not self._thread.is_alive(): + if self._thread is None or not self._thread.is_alive(): # pragma: no cover return self._snap_complete.clear() @@ -846,7 +846,7 @@ def _request_snap_sync(self) -> None: stacklevel=3, ) - def __repr__(self) -> str: + def __repr__(self) -> str: # pragma: no cover return ( f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} " f"_state={self._state!r}>" @@ -872,7 +872,7 @@ def start(self) -> None: def done(self) -> None: """Stop capturing, restore streams.""" self._assert_state("done", ("initialized", "started", "suspended", "done")) - if self._state == "done": + if self._state == "done": # pragma: no cover return # Restore original FD first to stop new writes going to pipe @@ -894,7 +894,7 @@ def done(self) -> None: os.close(self.pipe_r) os.close(self.targetfd_save) - if self.targetfd_invalid is not None: + if self.targetfd_invalid is not None: # pragma: no cover if self.targetfd_invalid != self.targetfd: os.close(self.targetfd) os.close(self.targetfd_invalid) @@ -904,7 +904,7 @@ def done(self) -> None: def suspend(self) -> None: """Suspend capturing, restoring original FD.""" self._assert_state("suspend", ("started", "suspended")) - if self._state == "suspended": + if self._state == "suspended": # pragma: no cover return self.syscapture.suspend() # Request sync before suspending to ensure buffer is current @@ -914,7 +914,7 @@ def suspend(self) -> None: def resume(self) -> None: self._assert_state("resume", ("started", "suspended")) - if self._state == "started": + if self._state == "started": # pragma: no cover return self.syscapture.resume() os.dup2(self.pipe_w, self.targetfd) diff --git a/testing/test_capture.py b/testing/test_capture.py index fc5a28e687e..ed523b55cb1 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -521,6 +521,20 @@ def test_subprocess(capteefd): # The captured subprocess output should appear in the report result.stdout.fnmatch_lines(["*subprocess_out*"]) + def test_capteefdbinary(self, pytester: Pytester) -> None: + """Test that capteefdbinary fixture captures binary output.""" + p = pytester.makepyfile( + """\ + import os + def test_binary(capteefdbinary): + os.write(1, b"binary_output") + out, err = capteefdbinary.readouterr() + assert out == b"binary_output" + """ + ) + result = pytester.runpytest(p) + assert result.ret == ExitCode.OK + def test_capsyscapfd(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ @@ -1325,7 +1339,7 @@ def writer(thread_id: int, count: int) -> None: try: for i in range(count): os.write(fd, f"t{thread_id}:{i}\n".encode()) - except Exception as e: + except Exception as e: # pragma: no cover errors.append(e) # Spawn multiple writer threads From b770b9e2634184bb96184be5800eac57f8ad30c3 Mon Sep 17 00:00:00 2001 From: Fazeel Usmani Date: Fri, 1 May 2026 19:52:11 +0530 Subject: [PATCH 11/11] Add coverage pragmas and capteefdbinary test --- src/_pytest/capture.py | 18 +++++++++--------- testing/test_capture.py | 16 +++++++++++++++- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 697b4e39e04..58aee9d3401 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -631,7 +631,7 @@ def __init__(self, targetfd: int) -> None: try: os.fstat(targetfd) - except OSError: + except OSError: # pragma: no cover self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR) os.dup2(self.targetfd_invalid, targetfd) else: @@ -656,7 +656,7 @@ def __init__(self, targetfd: int) -> None: self._snap_requested = threading.Event() self._snap_complete = threading.Event() - if targetfd == 0: + if targetfd == 0: # pragma: no cover self.syscapture: CaptureBase[str] = SysCapture(targetfd) else: if targetfd in patchsysdict: @@ -672,7 +672,7 @@ def _write_all(self, fd: int, data: bytes) -> None: while data: written = os.write(fd, data) data = data[written:] - except OSError: + except OSError: # pragma: no cover # Ignore write errors (e.g., broken pipe) - tee is best-effort pass @@ -832,7 +832,7 @@ def _request_snap_sync(self) -> None: This ensures all data written before this call is in the buffer. Does not modify FD state, so no capture gap. """ - if self._thread is None or not self._thread.is_alive(): + if self._thread is None or not self._thread.is_alive(): # pragma: no cover return self._snap_complete.clear() @@ -846,7 +846,7 @@ def _request_snap_sync(self) -> None: stacklevel=3, ) - def __repr__(self) -> str: + def __repr__(self) -> str: # pragma: no cover return ( f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} " f"_state={self._state!r}>" @@ -872,7 +872,7 @@ def start(self) -> None: def done(self) -> None: """Stop capturing, restore streams.""" self._assert_state("done", ("initialized", "started", "suspended", "done")) - if self._state == "done": + if self._state == "done": # pragma: no cover return # Restore original FD first to stop new writes going to pipe @@ -894,7 +894,7 @@ def done(self) -> None: os.close(self.pipe_r) os.close(self.targetfd_save) - if self.targetfd_invalid is not None: + if self.targetfd_invalid is not None: # pragma: no cover if self.targetfd_invalid != self.targetfd: os.close(self.targetfd) os.close(self.targetfd_invalid) @@ -904,7 +904,7 @@ def done(self) -> None: def suspend(self) -> None: """Suspend capturing, restoring original FD.""" self._assert_state("suspend", ("started", "suspended")) - if self._state == "suspended": + if self._state == "suspended": # pragma: no cover return self.syscapture.suspend() # Request sync before suspending to ensure buffer is current @@ -914,7 +914,7 @@ def suspend(self) -> None: def resume(self) -> None: self._assert_state("resume", ("started", "suspended")) - if self._state == "started": + if self._state == "started": # pragma: no cover return self.syscapture.resume() os.dup2(self.pipe_w, self.targetfd) diff --git a/testing/test_capture.py b/testing/test_capture.py index fc5a28e687e..ed523b55cb1 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -521,6 +521,20 @@ def test_subprocess(capteefd): # The captured subprocess output should appear in the report result.stdout.fnmatch_lines(["*subprocess_out*"]) + def test_capteefdbinary(self, pytester: Pytester) -> None: + """Test that capteefdbinary fixture captures binary output.""" + p = pytester.makepyfile( + """\ + import os + def test_binary(capteefdbinary): + os.write(1, b"binary_output") + out, err = capteefdbinary.readouterr() + assert out == b"binary_output" + """ + ) + result = pytester.runpytest(p) + assert result.ret == ExitCode.OK + def test_capsyscapfd(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ @@ -1325,7 +1339,7 @@ def writer(thread_id: int, count: int) -> None: try: for i in range(count): os.write(fd, f"t{thread_id}:{i}\n".encode()) - except Exception as e: + except Exception as e: # pragma: no cover errors.append(e) # Spawn multiple writer threads