diff --git a/changelog/14159.feature.rst b/changelog/14159.feature.rst new file mode 100644 index 00000000000..c3747f513e1 --- /dev/null +++ b/changelog/14159.feature.rst @@ -0,0 +1 @@ +Added ``--capture=tee-fd`` capture mode and ``capteefd``/``capteefdbinary`` fixtures for FD-level capture with real-time terminal passthrough. diff --git a/doc/en/how-to/capture-stdout-stderr.rst b/doc/en/how-to/capture-stdout-stderr.rst index 5de89bc0e3f..d0be844b650 100644 --- a/doc/en/how-to/capture-stdout-stderr.rst +++ b/doc/en/how-to/capture-stdout-stderr.rst @@ -32,7 +32,7 @@ a test. Setting capturing methods or disabling capturing ------------------------------------------------- -There are three ways in which ``pytest`` can perform capturing: +There are four ways in which ``pytest`` can perform capturing: * ``fd`` (file descriptor) level capturing (default): All writes going to the operating system file descriptors 1 and 2 will be captured. @@ -46,6 +46,11 @@ There are three ways in which ``pytest`` can perform capturing: the actual ``sys.stdout`` and ``sys.stderr``. This allows output to be 'live printed' and captured for plugin use, such as junitxml (new in pytest 5.4). +* ``tee-fd`` capturing: All writes going to file descriptors 1 and 2 will be + captured, including subprocess output. The writes will also be passed-through + to the original file descriptors in real-time. This combines the subprocess + capture ability of ``fd`` with the live output visibility of ``tee-sys``. + .. _`disable capturing`: You can influence output capturing mechanisms from the command line: @@ -57,6 +62,7 @@ You can influence output capturing mechanisms from the command line: pytest --capture=fd # also point filedescriptors 1 and 2 to temp file pytest --capture=tee-sys # combines 'sys' and '-s', capturing sys.stdout/stderr # and passing it along to the actual sys.stdout/stderr + pytest --capture=tee-fd # like 'fd' but also passes output through in real-time .. _printdebugging: diff --git a/doc/en/reference/fixtures.rst b/doc/en/reference/fixtures.rst index c4a8d01ff0e..98f1cd2dce1 100644 --- a/doc/en/reference/fixtures.rst +++ b/doc/en/reference/fixtures.rst @@ -39,6 +39,14 @@ Built-in fixtures :fixture:`capsysbinary` Capture, as bytes, output to ``sys.stdout`` and ``sys.stderr``. + :fixture:`capteefd` + Capture, as text, output to file descriptors 1 and 2, with real-time + pass-through. + + :fixture:`capteefdbinary` + Capture, as bytes, output to file descriptors 1 and 2, with real-time + pass-through. + :fixture:`cache` Store and retrieve values across pytest runs. diff --git a/doc/en/reference/reference.rst b/doc/en/reference/reference.rst index a69aa2c7887..b0f577b9b68 100644 --- a/doc/en/reference/reference.rst +++ b/doc/en/reference/reference.rst @@ -422,6 +422,26 @@ capteesys .. autofunction:: _pytest.capture.capteesys() :no-auto-options: +.. fixture:: capteefd + +capteefd +~~~~~~~~ + +**Tutorial**: :ref:`captures` + +.. autofunction:: _pytest.capture.capteefd() + :no-auto-options: + +.. fixture:: capteefdbinary + +capteefdbinary +~~~~~~~~~~~~~~ + +**Tutorial**: :ref:`captures` + +.. autofunction:: _pytest.capture.capteefdbinary() + :no-auto-options: + .. fixture:: capsysbinary capsysbinary @@ -3111,7 +3131,8 @@ Output Capture * ``fd``: capture at file descriptor level (default) * ``sys``: capture at sys level * ``no``: don't capture output - * ``tee-sys``: capture but also show output on terminal + * ``tee-sys``: capture sys level but also show output on terminal + * ``tee-fd``: capture fd level but also show output on terminal See :ref:`captures`. diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 6d98676be5f..58aee9d3401 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -12,9 +12,18 @@ import io from io import UnsupportedOperation import os +import queue import sys from tempfile import TemporaryFile +import threading from types import TracebackType +import warnings + + +# Platform-specific imports for non-blocking I/O +if sys.platform != "win32": + import fcntl + import select from typing import Any from typing import AnyStr from typing import BinaryIO @@ -43,7 +52,7 @@ from _pytest.reports import CollectReport -_CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] +_CaptureMethod = Literal["fd", "sys", "no", "tee-sys", "tee-fd"] def pytest_addoption(parser: Parser) -> None: @@ -53,8 +62,8 @@ def pytest_addoption(parser: Parser) -> None: action="store", default="fd", metavar="method", - choices=["fd", "sys", "no", "tee-sys"], - help="Per-test capturing method: one of fd|sys|no|tee-sys", + choices=["fd", "sys", "no", "tee-sys", "tee-fd"], + help="Per-test capturing method: one of fd|sys|no|tee-sys|tee-fd", ) group._addoption( # private to use reserved lower-case short option "-s", @@ -601,6 +610,367 @@ def writeorg(self, data: str) -> None: os.write(self.targetfd_save, data.encode("utf-8")) +class FDCaptureTeeBase(CaptureBase[AnyStr]): + """Base class for FD capture with real-time tee using pipe and thread. + + Uses a pipe to intercept FD writes. A background thread reads from the pipe, + writes to both the capture buffer and the original FD for real-time passthrough. + + Key design difference from FDCapture: since output is tee'd in real-time, + writeorg() is a no-op to prevent duplicate output when pop_outerr_to_orig() + replays captured data. + + Synchronization model: + - snap() requests a sync via _snap_requested event + - Thread acknowledges by setting _snap_complete event after draining + - This ensures all data written before snap() is in the buffer + """ + + def __init__(self, targetfd: int) -> None: + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: # pragma: no cover + self.targetfd_invalid: int | None = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + # Create a pipe: writes to targetfd go to pipe_w, we read from pipe_r + self.pipe_r, self.pipe_w = os.pipe() + + # Set pipe_r to non-blocking for proper synchronization (Unix only) + if sys.platform != "win32": + flags = fcntl.fcntl(self.pipe_r, fcntl.F_GETFL) + fcntl.fcntl(self.pipe_r, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + # Buffer to collect captured output + self._buffer = io.BytesIO() + self._buffer_lock = threading.Lock() + + # Thread synchronization + self._thread: threading.Thread | None = None + self._shutdown = threading.Event() + self._snap_requested = threading.Event() + self._snap_complete = threading.Event() + + if targetfd == 0: # pragma: no cover + self.syscapture: CaptureBase[str] = SysCapture(targetfd) + else: + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, tee=True) + else: + self.syscapture = NoCapture(targetfd) + + self._state = "initialized" + + def _write_all(self, fd: int, data: bytes) -> None: + """Write all data to fd, handling partial writes.""" + try: + while data: + written = os.write(fd, data) + data = data[written:] + except OSError: # pragma: no cover + # Ignore write errors (e.g., broken pipe) - tee is best-effort + pass + + def _tee_thread(self) -> None: + """Background thread that reads from pipe and tees to buffer + original fd. + + Uses non-blocking reads with select/poll for proper synchronization. + Responds to snap requests by draining the pipe and signaling completion. + """ + if sys.platform == "win32": # pragma: no cover + self._tee_thread_windows() + else: # pragma: no cover + self._tee_thread_unix() + + def _tee_thread_unix(self) -> None: # pragma: no cover + """Unix implementation using select() for non-blocking reads.""" + while not self._shutdown.is_set(): + # Check for snap request + if self._snap_requested.is_set(): + # Drain all available data + self._drain_nonblocking() + self._snap_requested.clear() + self._snap_complete.set() + + # Wait for data with timeout + try: + readable, _, _ = select.select([self.pipe_r], [], [], 0.01) + if readable: + try: + data = os.read(self.pipe_r, 32768) + if not data: + break # EOF + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except BlockingIOError: + pass # No data available + except OSError: + break + except (ValueError, OSError): + break # Pipe closed + + # Final drain on shutdown + self._drain_nonblocking() + # Signal any pending snap + if self._snap_requested.is_set(): + self._snap_requested.clear() + self._snap_complete.set() + + def _tee_thread_windows(self) -> None: # pragma: no cover + """Windows implementation using a queue for non-blocking communication. + + On Windows, select() doesn't work on pipes and os.read() blocks indefinitely. + We use a separate reader thread that pushes data to a queue, allowing the + main tee thread to respond to snap requests without blocking. + """ + data_queue: queue.Queue[bytes | None] = queue.Queue() + reader_shutdown = threading.Event() + + def reader_thread() -> None: + """Helper thread that performs blocking reads and queues data.""" + try: + while not reader_shutdown.is_set(): + try: + data = os.read(self.pipe_r, 32768) + if not data: + data_queue.put(None) # EOF signal + break + data_queue.put(data) + except OSError: + data_queue.put(None) # Error signal + break + except Exception: + data_queue.put(None) + + reader = threading.Thread(target=reader_thread, daemon=True) + reader.start() + + try: + while not self._shutdown.is_set(): + # Check for snap request + if self._snap_requested.is_set(): + # Wait a bit for any in-flight reads to complete and be queued. + # This handles the race where snap() is called immediately after + # write() but before the reader thread has read the data. + # We do multiple short waits with queue checks to minimize latency. + for _ in range(5): # Up to 50ms total + try: + data = data_queue.get(timeout=0.01) + if data is None: + break + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + pass + # Final non-blocking drain + self._drain_queue_nonblocking(data_queue) + self._snap_requested.clear() + self._snap_complete.set() + + # Get data from queue with timeout to stay responsive + try: + data = data_queue.get(timeout=0.01) + if data is None: + break # EOF or error + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + pass # No data available, continue loop + + # Final drain on shutdown + self._drain_queue_nonblocking(data_queue) + finally: + reader_shutdown.set() + # Reader thread will exit when pipe is closed or on next read + + # Signal any pending snap + if self._snap_requested.is_set(): + self._snap_requested.clear() + self._snap_complete.set() + + def _drain_queue_nonblocking( + self, data_queue: queue.Queue[bytes | None] + ) -> None: # pragma: no cover + """Drain all immediately available data from queue (non-blocking).""" + while True: + try: + data = data_queue.get_nowait() + if data is None: + break # EOF marker + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except queue.Empty: + break # No more data available + + def _drain_nonblocking(self) -> None: # pragma: no cover + """Drain all immediately available data from pipe (non-blocking).""" + while True: + try: + data = os.read(self.pipe_r, 32768) + if not data: + break + with self._buffer_lock: + self._buffer.write(data) + self._write_all(self.targetfd_save, data) + except BlockingIOError: + break # No more data available + except OSError: + break + + def _request_snap_sync(self) -> None: + """Request thread to drain pipe and wait for completion. + + This ensures all data written before this call is in the buffer. + Does not modify FD state, so no capture gap. + """ + if self._thread is None or not self._thread.is_alive(): # pragma: no cover + return + + self._snap_complete.clear() + self._snap_requested.set() + + # Wait for thread to acknowledge (with timeout) + if not self._snap_complete.wait(timeout=1.0): # pragma: no cover + warnings.warn( + f"FDCaptureTee snap sync for fd {self.targetfd} timed out. " + "Some captured output may be incomplete.", + stacklevel=3, + ) + + def __repr__(self) -> str: # pragma: no cover + return ( + f"<{self.__class__.__name__} {self.targetfd} oldfd={self.targetfd_save} " + f"_state={self._state!r}>" + ) + + def _assert_state(self, op: str, states: tuple[str, ...]) -> None: + assert self._state in states, ( + "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + ) + + def start(self) -> None: + """Start capturing on targetfd using pipe with tee thread.""" + self._assert_state("start", ("initialized",)) + os.dup2(self.pipe_w, self.targetfd) + self._shutdown.clear() + self._thread = threading.Thread(target=self._tee_thread, daemon=False) + self._thread.start() + self.syscapture.start() + self._state = "started" + + def done(self) -> None: + """Stop capturing, restore streams.""" + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": # pragma: no cover + return + + # Restore original FD first to stop new writes going to pipe + os.dup2(self.targetfd_save, self.targetfd) + + # Signal thread to shutdown and close pipe write end + self._shutdown.set() + os.close(self.pipe_w) + + # Wait for thread to finish processing all data + if self._thread is not None: + self._thread.join(timeout=5.0) + if self._thread.is_alive(): # pragma: no cover + warnings.warn( + f"FDCaptureTee thread for fd {self.targetfd} did not exit cleanly " + "within timeout. Some captured output may be lost.", + stacklevel=2, + ) + + os.close(self.pipe_r) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: # pragma: no cover + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self._state = "done" + + def suspend(self) -> None: + """Suspend capturing, restoring original FD.""" + self._assert_state("suspend", ("started", "suspended")) + if self._state == "suspended": # pragma: no cover + return + self.syscapture.suspend() + # Request sync before suspending to ensure buffer is current + self._request_snap_sync() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self) -> None: + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": # pragma: no cover + return + self.syscapture.resume() + os.dup2(self.pipe_w, self.targetfd) + self._state = "started" + + +class FDCaptureBinaryTee(FDCaptureTeeBase[bytes]): + """Capture IO to/from a given OS-level file descriptor, with real-time tee. + + snap() produces `bytes`. + """ + + EMPTY_BUFFER = b"" + + def snap(self) -> bytes: + self._assert_state("snap", ("started", "suspended")) + # Request thread to drain pipe - no FD manipulation, no capture gap + self._request_snap_sync() + with self._buffer_lock: + res = self._buffer.getvalue() + self._buffer.seek(0) + self._buffer.truncate() + return res + + def writeorg(self, data: bytes) -> None: + """No-op: output is already tee'd in real-time by the background thread. + + This prevents duplicate output when pop_outerr_to_orig() replays captured data. + """ + + +class FDCaptureTee(FDCaptureTeeBase[str]): + """Capture IO to/from a given OS-level file descriptor, with real-time tee. + + snap() produces text. + """ + + EMPTY_BUFFER = "" + + def snap(self) -> str: + self._assert_state("snap", ("started", "suspended")) + # Request thread to drain pipe - no FD manipulation, no capture gap + self._request_snap_sync() + with self._buffer_lock: + res = self._buffer.getvalue().decode("utf-8", errors="replace") + self._buffer.seek(0) + self._buffer.truncate() + return res + + def writeorg(self, data: str) -> None: + """No-op: output is already tee'd in real-time by the background thread. + + This prevents duplicate output when pop_outerr_to_orig() replays captured data. + """ + + # MultiCapture @@ -720,6 +1090,8 @@ def _get_multicapture(method: _CaptureMethod) -> MultiCapture[str]: return MultiCapture( in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) ) + elif method == "tee-fd": + return MultiCapture(in_=None, out=FDCaptureTee(1), err=FDCaptureTee(2)) raise ValueError(f"unknown capturing method: {method!r}") @@ -1142,3 +1514,67 @@ def test_system_echo(capfdbinary): yield capture_fixture capture_fixture.close() capman.unset_fixture() + + +@fixture +def capteefd(request: SubRequest) -> Generator[CaptureFixture[str]]: + r"""Enable text capturing of writes to file descriptors ``1`` and ``2``, + with real-time pass-through to the terminal. + + The captured output is made available via ``capteefd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. + + Output is also passed through to the original file descriptors in real-time, + allowing visibility of test progress while still capturing for reports. + + Returns an instance of :class:`CaptureFixture[str] `. + + Example: + + .. code-block:: python + + def test_system_echo(capteefd): + os.system('echo "hello"') + captured = capteefd.readouterr() + assert captured.out == "hello\n" + """ + capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCaptureTee, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@fixture +def capteefdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes]]: + r"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``, + with real-time pass-through to the terminal. + + The captured output is made available via ``capteefdbinary.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``bytes`` objects. + + Output is also passed through to the original file descriptors in real-time, + allowing visibility of test progress while still capturing for reports. + + Returns an instance of :class:`CaptureFixture[bytes] `. + + Example: + + .. code-block:: python + + def test_system_echo(capteefdbinary): + os.system('echo "hello"') + captured = capteefdbinary.readouterr() + assert captured.out == b"hello\n" + """ + capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture(FDCaptureBinaryTee, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() diff --git a/testing/test_capture.py b/testing/test_capture.py index 7aaba99fe43..ed523b55cb1 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -10,6 +10,7 @@ import subprocess import sys import textwrap +import threading from typing import BinaryIO from typing import cast from typing import TextIO @@ -479,6 +480,61 @@ def test_one(capteesys): result.stdout.fnmatch_lines(["sTdoUt", "sTdeRr"]) # no tee, just reported assert not result.stderr.lines + def test_capteefd(self, pytester: Pytester) -> None: + """Test that capteefd captures FD-level output with tee passthrough.""" + p = pytester.makepyfile( + """\ + import os + def test_one(capteefd): + os.write(1, b"fDoUt") + os.write(2, b"fDeRr") + out, err = capteefd.readouterr() + assert out == "fDoUt" + assert err == "fDeRr" + """ + ) + # First test: just verify the capture works + result = pytester.runpytest(p, "--capture=fd") + assert result.ret == ExitCode.OK + + # Test with tee-fd: capture should work and output should be tee'd + result = pytester.runpytest(p, "--capture=tee-fd", "-rA") + assert result.ret == ExitCode.OK + # The captured output should appear in the report + result.stdout.fnmatch_lines(["*fDoUt*", "*fDeRr*"]) + + def test_capteefd_subprocess(self, pytester: Pytester) -> None: + """Test that capteefd captures subprocess output.""" + p = pytester.makepyfile( + """\ + import subprocess + import sys + def test_subprocess(capteefd): + subprocess.run([sys.executable, "-c", "print('subprocess_out')"]) + out, err = capteefd.readouterr() + assert "subprocess_out" in out + """ + ) + # Test that subprocess output is captured (the key feature of FD-level capture) + result = pytester.runpytest(p, "--capture=tee-fd", "-rA") + assert result.ret == ExitCode.OK + # The captured subprocess output should appear in the report + result.stdout.fnmatch_lines(["*subprocess_out*"]) + + def test_capteefdbinary(self, pytester: Pytester) -> None: + """Test that capteefdbinary fixture captures binary output.""" + p = pytester.makepyfile( + """\ + import os + def test_binary(capteefdbinary): + os.write(1, b"binary_output") + out, err = capteefdbinary.readouterr() + assert out == b"binary_output" + """ + ) + result = pytester.runpytest(p) + assert result.ret == ExitCode.OK + def test_capsyscapfd(self, pytester: Pytester) -> None: p = pytester.makepyfile( """\ @@ -1109,6 +1165,211 @@ def test_capfd_sys_stdout_mode(self, capfd) -> None: assert "b" not in sys.stdout.mode +class TestFDCaptureTee: + """Tests for FDCaptureTee - FD-level capture with real-time tee.""" + + def test_simple(self) -> None: + """Test basic capture and tee functionality.""" + with saved_fd(1): + cap = capture.FDCaptureTee(1) + cap.start() + os.write(1, b"hello") + s = cap.snap() + cap.done() + assert s == "hello" + + def test_tee_passthrough(self, tmpfile: BinaryIO) -> None: + """Test that output is passed through to original FD.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"hello") + s = cap.snap() + cap.done() + assert s == "hello" + # Check that output was tee'd to original file + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"hello" + + def test_suspend_resume(self) -> None: + """Test suspend and resume functionality.""" + with saved_fd(1): + cap = capture.FDCaptureTee(1) + cap.start() + os.write(1, b"before") + s = cap.snap() + assert s == "before" + cap.suspend() + os.write(1, b"during_suspend") + assert not cap.snap() + cap.resume() + os.write(1, b"after") + s = cap.snap() + assert s == "after" + cap.done() + + def test_binary(self) -> None: + """Test FDCaptureBinaryTee returns bytes.""" + with saved_fd(1): + cap = capture.FDCaptureBinaryTee(1) + cap.start() + os.write(1, b"hello") + s = cap.snap() + cap.done() + assert s == b"hello" + assert isinstance(s, bytes) + + def test_large_output(self, tmpfile: BinaryIO) -> None: + """Test handling of large output that exceeds pipe buffer.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + # Write more than typical pipe buffer (64KB on many systems) + large_data = b"x" * 100000 + os.write(fd, large_data) + s = cap.snap() + cap.done() + assert len(s) == 100000 + assert s == "x" * 100000 + # Verify tee'd output + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == large_data + + def test_capture_and_tee_match(self, tmpfile: BinaryIO) -> None: + """Test that captured output matches tee'd output exactly.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + # Write multiple chunks + for i in range(10): + os.write(fd, f"chunk{i}\n".encode()) + captured = cap.snap() + cap.done() + # Verify captured matches tee'd + tmpfile.seek(0) + tee_output = tmpfile.read().decode() + assert captured == tee_output + + def test_output_pending_at_done(self, tmpfile: BinaryIO) -> None: + """Test that output written just before done() is still captured and tee'd.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"final_output") + # Call done() immediately - data should still be captured + s = cap.snap() + cap.done() + assert s == "final_output" + # Verify it was also tee'd + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"final_output" + + def test_stderr_capture(self) -> None: + """Test capturing stderr with tee.""" + with saved_fd(2): + cap = capture.FDCaptureTee(2) + cap.start() + os.write(2, b"stderr_output") + s = cap.snap() + cap.done() + assert s == "stderr_output" + + def test_writeorg_is_noop(self, tmpfile: BinaryIO) -> None: + """Test that writeorg() is a no-op to prevent duplicate output.""" + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + os.write(fd, b"hello") + s = cap.snap() + # writeorg should be a no-op since data was already tee'd + cap.writeorg("should_not_appear") + cap.done() + assert s == "hello" + # Verify only original data was tee'd, not the writeorg call + tmpfile.seek(0) + tee_output = tmpfile.read() + assert tee_output == b"hello" + assert b"should_not_appear" not in tee_output + + def test_realtime_passthrough(self, tmpfile: BinaryIO) -> None: + """Test that output is passed through in real-time, before snap() is called.""" + import time + + fd = tmpfile.fileno() + # Save the original fd so we can read from it later + saved_fd = os.dup(fd) + + cap = capture.FDCaptureTee(fd) + cap.start() + + # Write data + os.write(fd, b"realtime_data") + + # Give the tee thread time to process (it runs in background) + time.sleep(0.05) + + # Check that data was tee'd BEFORE calling snap() + # Read from the saved fd (which points to the original file) + os.lseek(saved_fd, 0, os.SEEK_SET) + tee_output = os.read(saved_fd, 1024) + assert tee_output == b"realtime_data", ( + "Output should appear in real-time before snap()" + ) + + # Now snap should also have the data + captured = cap.snap() + cap.done() + os.close(saved_fd) + assert captured == "realtime_data" + + def test_concurrent_writes(self, tmpfile: BinaryIO) -> None: + """Test handling of concurrent writes from multiple threads.""" + import time + + fd = tmpfile.fileno() + cap = capture.FDCaptureTee(fd) + cap.start() + + errors: list[Exception] = [] + + def writer(thread_id: int, count: int) -> None: + try: + for i in range(count): + os.write(fd, f"t{thread_id}:{i}\n".encode()) + except Exception as e: # pragma: no cover + errors.append(e) + + # Spawn multiple writer threads + threads = [] + for i in range(3): + t = threading.Thread(target=writer, args=(i, 10)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Small delay for all data to flow through + time.sleep(0.1) + + captured = cap.snap() + cap.done() + + assert not errors, f"Writer threads had errors: {errors}" + + # Verify all writes were captured (30 lines total) + lines = captured.strip().split("\n") + assert len(lines) == 30, f"Expected 30 lines, got {len(lines)}" + + # Verify tee'd output matches captured + tmpfile.seek(0) + tee_output = tmpfile.read().decode() + assert tee_output == captured + + @contextlib.contextmanager def saved_fd(fd): new_fd = os.dup(fd)