diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst index fe64daa3291d67..e169bb9b935647 100644 --- a/Doc/library/subprocess.rst +++ b/Doc/library/subprocess.rst @@ -261,6 +261,217 @@ underlying :class:`Popen` interface can be used directly. *stdout* and *stderr* attributes added +.. function:: run_pipeline(*commands, stdin=None, input=None, \ + stdout=None, stderr=None, capture_output=False, \ + timeout=None, check=False, encoding=None, \ + errors=None, text=None, env=None, \ + **other_popen_kwargs) + + Run a pipeline of commands connected via pipes, similar to shell pipelines. + Wait for all commands to complete, then return a :class:`CompletedPipeline` + instance. + + Each positional argument should be a command (a list of strings, or a string + if ``shell=True``) to execute. The standard output of each command is + connected to the standard input of the next command in the pipeline. + + This function requires at least two commands. For a single command, use + :func:`run` instead. + + If *capture_output* is true, the standard output of the final command and + the standard error of all commands will be captured. All processes in the + pipeline share a single stderr pipe, so their error output will be + interleaved. The *stdout* and *stderr* arguments may not be supplied at + the same time as *capture_output*. + + A *timeout* may be specified in seconds. If the timeout expires, all + child processes will be killed and waited for, and then a + :exc:`TimeoutExpired` exception will be raised. + + The *input* argument is passed to the first command's stdin. If used, it + must be a byte sequence, or a string if *encoding* or *errors* is specified + or *text* is true. + + If *check* is true, and any process in the pipeline exits with a non-zero + exit code, a :exc:`PipelineError` exception will be raised. This behavior + is similar to the shell's ``pipefail`` option. + + If *encoding* or *errors* are specified, or *text* is true, file objects + are opened in text mode using the specified encoding and errors. + + .. note:: + + When using ``text=True`` with ``capture_output=True`` or ``stderr=PIPE``, + be aware that stderr output from multiple processes may be interleaved + in ways that produce incomplete multi-byte character sequences. For + reliable text decoding of stderr, consider capturing in binary mode + and decoding manually with appropriate error handling, or use + ``errors='replace'`` or ``errors='backslashreplace'``. + + .. note:: + + Passing ``stderr=STDOUT`` redirects each child process's stderr to its + own stdout file descriptor. For non-final processes in the pipeline + this means stderr is merged into the next process's stdin, which is + rarely what callers want. Only the final process's stderr ends up at + the pipeline's stdout destination. To merge stderr-and-stdout output + from the whole pipeline, use ``capture_output=True`` (or pass an + explicit file descriptor as *stderr*) instead of ``STDOUT``. + + .. note:: + + When stderr is captured (via ``capture_output=True`` or + ``stderr=PIPE``), every command in the pipeline inherits a copy of the + same stderr write file descriptor. If any child spawns a daemon or + grandchild that keeps stderr open after the child exits, the parent's + read on the stderr pipe will not see EOF and :func:`run_pipeline` will + hang. This matches shell ``2>&1 | other`` behavior. If this is a + concern, either do not capture stderr, manage each command with an + individual :class:`Popen` so each has its own stderr pipe, or ensure + grandchildren fully detach (closing inherited fds) before + daemonizing. + + If *stdin* is specified, it is connected to the first command's standard + input. If *stdout* is specified, it is connected to the last command's + standard output. When *stdout* is :data:`PIPE`, the output is available + in the returned :class:`CompletedPipeline`'s :attr:`~CompletedPipeline.stdout` + attribute. Other keyword arguments are passed to each :class:`Popen` call; + in particular *pass_fds* is forwarded to *every* command in the pipeline, + so any inheritable file descriptor passed via *pass_fds* is visible to all + children (unlike a shell, which can give each command its own fd set). + ``close_fds=False`` is rejected because inherited copies of the + inter-process pipe ends in sibling children would prevent EOF from being + signaled and cause deadlocks. + + Examples:: + + >>> import subprocess + >>> # Equivalent to: echo "hello world" | tr a-z A-Z + >>> result = subprocess.run_pipeline( + ... ['echo', 'hello world'], + ... ['tr', 'a-z', 'A-Z'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'HELLO WORLD\n' + >>> result.returncodes + [0, 0] + + >>> # Pipeline with three commands + >>> result = subprocess.run_pipeline( + ... ['echo', 'one\ntwo\nthree'], + ... ['sort'], + ... ['head', '-n', '2'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'one\nthree\n' + + >>> # Using input parameter + >>> result = subprocess.run_pipeline( + ... ['cat'], + ... ['wc', '-l'], + ... input='line1\nline2\nline3\n', + ... capture_output=True, text=True + ... ) + >>> result.stdout.strip() + '3' + + >>> # Error handling with check=True + >>> subprocess.run_pipeline( + ... ['echo', 'hello'], + ... ['false'], # exits with status 1 + ... check=True + ... ) + Traceback (most recent call last): + ... + subprocess.PipelineError: Pipeline failed: command 1 ['false'] returned 1 + + .. versionadded:: next + + +.. class:: CompletedPipeline + + The return value from :func:`run_pipeline`, representing a pipeline of + processes that have finished. + + .. attribute:: commands + + The list of commands used to launch the pipeline. Each command is a list + of strings (or a string if ``shell=True`` was used). + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. Typically, + an exit status of 0 indicates that the command ran successfully. + + A negative value ``-N`` indicates that the command was terminated by + signal ``N`` (POSIX only). + + .. attribute:: returncode + + Exit status of the final command in the pipeline. This is a convenience + property equivalent to ``returncodes[-1]``. + + Note that this matches shell behavior *without* ``pipefail``: a zero + ``returncode`` does not imply that earlier commands in the pipeline + succeeded. To check that all commands succeeded, use + :meth:`check_returncodes` or pass ``check=True`` to + :func:`run_pipeline`. + + .. attribute:: stdout + + Captured stdout from the final command in the pipeline. A bytes sequence, + or a string if :func:`run_pipeline` was called with an encoding, errors, + or ``text=True``. ``None`` if stdout was not captured. + + .. attribute:: stderr + + Captured stderr from all commands in the pipeline, combined. A bytes + sequence, or a string if :func:`run_pipeline` was called with an + encoding, errors, or ``text=True``. ``None`` if stderr was not captured. + + .. method:: check_returncodes() + + If any command's :attr:`returncode` is non-zero, raise a + :exc:`PipelineError`. + + .. versionadded:: next + + +.. exception:: PipelineError + + Subclass of :exc:`SubprocessError`, raised when a pipeline run by + :func:`run_pipeline` (with ``check=True``) contains one or more commands + that returned a non-zero exit status. This is similar to the shell's + ``pipefail`` behavior. + + .. attribute:: commands + + List of commands that were used in the pipeline. + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. + + .. attribute:: stdout + + Output of the final command if it was captured. Otherwise, ``None``. + + .. attribute:: stderr + + Combined stderr output of all commands if it was captured. + Otherwise, ``None``. + + .. attribute:: failed + + List of ``(index, command, returncode)`` tuples for each command + that returned a non-zero exit status. The *index* is the position + of the command in the pipeline (0-based). + + .. versionadded:: next + + .. _frequently-used-arguments: Frequently Used Arguments diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 7ac2289f535b6d..54251aa6e90869 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -62,7 +62,8 @@ __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", - "SubprocessError", "TimeoutExpired", "CompletedProcess"] + "SubprocessError", "TimeoutExpired", "CompletedProcess", + "run_pipeline", "CompletedPipeline", "PipelineError"] # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. @@ -194,6 +195,39 @@ def stdout(self, value): self.output = value +class PipelineError(SubprocessError): + """Raised when run_pipeline() is called with check=True and one or more + commands in the pipeline return a non-zero exit status. + + Attributes: + commands: List of commands in the pipeline (each a list of strings). + returncodes: List of return codes corresponding to each command. + stdout: Standard output from the final command (if captured). + stderr: Standard error output (if captured). + failed: List of (index, command, returncode) tuples for failed commands. + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + super().__init__(commands, returncodes) + self.commands = commands + self.returncodes = returncodes + self.stdout = stdout + self.stderr = stderr + self.failed = [ + (i, cmd, rc) + for i, (cmd, rc) in enumerate(zip(commands, returncodes)) + if rc != 0 + ] + + def __str__(self): + LIMIT = 3 + head = self.failed[:LIMIT] + parts = [f"command {i} {cmd!r} returned {rc}" for i, cmd, rc in head] + extra = len(self.failed) - len(head) + if extra > 0: + parts.append(f"and {extra} more") + return f"Pipeline failed: {', '.join(parts)}" + + if _mswindows: class STARTUPINFO: def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None, @@ -289,6 +323,302 @@ def _cleanup(): DEVNULL = -3 +def _deadline_remaining(endtime): + """Calculate remaining time until deadline.""" + if endtime is None: + return None + return endtime - _time() + + +def _flush_stdin(stdin): + """Flush stdin, ignoring BrokenPipeError and closed file ValueError.""" + try: + stdin.flush() + except BrokenPipeError: + pass + except ValueError: + # Ignore ValueError: I/O operation on closed file. + if not stdin.closed: + raise + + +def _make_input_view(input_data): + """Convert input data to a byte memoryview for writing. + + Handles the case where input_data is already a memoryview with + non-byte elements (e.g., int32 array) by casting to a byte view. + This ensures len(view) returns the byte count, not element count. + """ + if not input_data: + return None + if isinstance(input_data, memoryview): + return input_data.cast("b") # ensure byte view for correct len() + return memoryview(input_data) + + +def _translate_newlines(data, encoding, errors): + """Decode bytes to str and translate newlines to \n.""" + data = data.decode(encoding, errors) + return data.replace("\r\n", "\n").replace("\r", "\n") + + +def _communicate_io_posix(selector, stdin, input_view, input_offset, + output_buffers, endtime, *, close_on_eof=False): + """ + Low-level POSIX I/O multiplexing loop. + + This is the common core used by both _communicate_streams() and + Popen._communicate(). It handles the select loop for reading/writing + but does not manage stream lifecycle or raise timeout exceptions. + + Args: + selector: A _PopenSelector with streams already registered + stdin: Writable file object for input, or None + input_view: memoryview of input bytes, or None + input_offset: Starting offset into input_view (for resume support) + output_buffers: Dict {file_object: list} to append read chunks to + endtime: Deadline timestamp, or None for no timeout + close_on_eof: If True, close output streams immediately when they + EOF rather than leaving them open for the caller to close. + Used by Popen._communicate() to match its historical behavior + of releasing fds as soon as the child closes the corresponding + pipe. + + Returns: + (new_input_offset, completed) + - new_input_offset: How many bytes of input were written + - completed: True if all I/O finished, False if timed out + + Note: + - Closes output streams on EOF only if close_on_eof=True + - Does NOT raise TimeoutExpired (caller handles) + - Appends to output_buffers lists in place + """ + stdin_fd = stdin.fileno() if stdin else None + + while selector.get_map(): + remaining = _deadline_remaining(endtime) + if remaining is not None and remaining <= 0: + return (input_offset, False) # Timed out + + ready = selector.select(remaining) + + # Check timeout after select (may have woken spuriously) + if endtime is not None and _time() > endtime: + return (input_offset, False) # Timed out + + for key, events in ready: + if key.fd == stdin_fd: + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_view): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fileobj in output_buffers: + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fileobj) + if close_on_eof: + try: + key.fileobj.close() + except OSError: + pass + else: + output_buffers[key.fileobj].append(data) + + return (input_offset, True) # Completed + + +def _communicate_streams(stdin=None, input_data=None, read_streams=None, + timeout=None, cmd_for_timeout=None, + stdout_stream=None, stderr_stream=None): + """ + Multiplex I/O: write input_data to stdin, read from read_streams. + + All streams must be file objects (not raw file descriptors). + All I/O is done in binary mode; caller handles text encoding. + + Args: + stdin: Writable binary file object for input, or None + input_data: Bytes to write to stdin, or None + read_streams: List of readable binary file objects to read from + timeout: Timeout in seconds, or None for no timeout + cmd_for_timeout: Value to use for TimeoutExpired.cmd + stdout_stream: File object in read_streams that holds stdout data, + or None. Used only to populate TimeoutExpired.output on a + partial timeout. + stderr_stream: File object in read_streams that holds stderr data, + or None. Used only to populate TimeoutExpired.stderr on a + partial timeout. + + Returns: + Dict mapping each file object in read_streams to its bytes data. + All file objects in read_streams will be closed. + + Raises: + TimeoutExpired: If timeout expires (with partial data) + """ + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + + read_streams = read_streams or [] + + if _mswindows: + return _communicate_streams_windows( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout, + stdout_stream, stderr_stream) + else: + return _communicate_streams_posix( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout, + stdout_stream, stderr_stream) + + +if _mswindows: + def _reader_thread_func(fh, buffer): + """Thread function to read from a file handle into a buffer list.""" + try: + buffer.append(fh.read()) + except OSError: + buffer.append(b'') + + def _writer_thread_func(fh, data, result): + """Thread function to write data to a file handle and close it.""" + try: + if data: + fh.write(data) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + result.append(exc) + try: + fh.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL and not result: + result.append(exc) + + def _communicate_streams_windows(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout, + stdout_stream=None, stderr_stream=None): + """Windows implementation using threads.""" + threads = [] + buffers = {} + writer_thread = None + writer_result = [] + + if stdin and input_data: + writer_thread = threading.Thread( + target=_writer_thread_func, + args=(stdin, input_data, writer_result)) + writer_thread.daemon = True + writer_thread.start() + elif stdin: + try: + stdin.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise + + for stream in read_streams: + buf = [] + buffers[stream] = buf + t = threading.Thread(target=_reader_thread_func, args=(stream, buf)) + t.daemon = True + t.start() + threads.append((stream, t)) + + def _raise_timeout(): + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(stdout_stream), + stderr=results.get(stderr_stream)) + + # Drain the writer before any reader so a stalled write surfaces as + # the timeout source, not a partial read. + if writer_thread is not None: + remaining = _deadline_remaining(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + writer_thread.join(remaining) + if writer_thread.is_alive(): + _raise_timeout() + if writer_result: + raise writer_result[0] + + for stream, t in threads: + remaining = _deadline_remaining(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + t.join(remaining) + if t.is_alive(): + _raise_timeout() + + return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()} + +else: + def _communicate_streams_posix(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout, + stdout_stream=None, stderr_stream=None): + """POSIX implementation using selectors.""" + output_buffers = {stream: [] for stream in read_streams} + + if stdin: + _flush_stdin(stdin) + if not input_data: + try: + stdin.close() + except BrokenPipeError: + pass + stdin = None # don't register with selector + + input_view = _make_input_view(input_data) + + with _PopenSelector() as selector: + if stdin and input_data: + selector.register(stdin, selectors.EVENT_WRITE) + for stream in read_streams: + selector.register(stream, selectors.EVENT_READ) + + _, completed = _communicate_io_posix( + selector, stdin, input_view, 0, output_buffers, endtime) + + if not completed: + results = {stream: b''.join(chunks) + for stream, chunks in output_buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(stdout_stream), + stderr=results.get(stderr_stream)) + + results = {} + for stream, chunks in output_buffers.items(): + results[stream] = b''.join(chunks) + try: + stream.close() + except OSError: + pass + + return results + + # XXX This function is only used by multiprocessing and the test suite, # but it's here so that it can be imported when Python is compiled without # threads. @@ -509,6 +839,47 @@ def check_returncode(self): self.stderr) +class CompletedPipeline: + """A pipeline of processes that have finished running. + + This is returned by run_pipeline(). + + Attributes: + commands: List of commands in the pipeline (each command is a list). + returncodes: List of return codes for each command in the pipeline. + returncode: The return code of the final command (for convenience). + stdout: The standard output of the final command (None if not captured). + stderr: The standard error output (None if not captured). + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = list(commands) + self.returncodes = list(returncodes) + self.stdout = stdout + self.stderr = stderr + + @property + def returncode(self): + """Return the exit code of the final command in the pipeline.""" + return self.returncodes[-1] + + def __repr__(self): + args = [f'commands={self.commands!r}', + f'returncodes={self.returncodes!r}'] + if self.stdout is not None: + args.append(f'stdout={self.stdout!r}') + if self.stderr is not None: + args.append(f'stderr={self.stderr!r}') + return f"{type(self).__name__}({', '.join(args)})" + + __class_getitem__ = classmethod(types.GenericAlias) + + def check_returncodes(self): + """Raise PipelineError if any command's exit code is non-zero.""" + if any(rc != 0 for rc in self.returncodes): + raise PipelineError(self.commands, self.returncodes, + self.stdout, self.stderr) + + def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs): """Run command with arguments and return a CompletedProcess instance. @@ -579,6 +950,243 @@ def run(*popenargs, return CompletedProcess(process.args, retcode, stdout, stderr) +def run_pipeline(*commands, input=None, capture_output=False, timeout=None, + check=False, **kwargs): + """Run a pipeline of commands connected via pipes. + + Each positional argument should be a command (list of strings or a string + if shell=True) to execute. The stdout of each command is connected to the + stdin of the next command in the pipeline, similar to shell pipelines. + + Returns a CompletedPipeline instance with attributes commands, returncodes, + stdout, and stderr. By default, stdout and stderr are not captured, and + those attributes will be None. Pass capture_output=True to capture both + the final command's stdout and stderr from all commands. + + If check is True and any command's exit code is non-zero, it raises a + PipelineError. This is similar to shell "pipefail" behavior. + + If timeout (seconds) is given and the pipeline takes too long, a + TimeoutExpired exception will be raised and all processes will be killed. + + The optional "input" argument allows passing bytes or a string to the + first command's stdin. If you use this argument, you may not also specify + stdin in kwargs. + + By default, all communication is in bytes. Use text=True, encoding, or + errors to enable text mode, which affects the input argument and stdout/ + stderr outputs. + + .. note:: + When using text=True with capture_output=True or stderr=PIPE, be aware + that stderr output from multiple processes may be interleaved in ways + that produce invalid character sequences when decoded. For reliable + text decoding, avoid text=True when capturing stderr from pipelines, + or handle decoding errors appropriately. + + Other keyword arguments are passed to each Popen call, except for stdin, + stdout, and stderr (when stderr=PIPE or capture_output=True), which are + managed by the pipeline. + + Example: + # Equivalent to: cat file.txt | grep pattern | wc -l + result = run_pipeline( + ['cat', 'file.txt'], + ['grep', 'pattern'], + ['wc', '-l'], + capture_output=True, text=True + ) + print(result.stdout) # "42\\n" + print(result.returncodes) # [0, 0, 0] + """ + if len(commands) < 2: + raise ValueError('run_pipeline requires at least 2 commands') + + if input is not None and kwargs.get('stdin') is not None: + raise ValueError('stdin and input arguments may not both be used.') + if kwargs.get('stdin') is PIPE: + raise ValueError('stdin=PIPE is not supported by run_pipeline; ' + 'pass input= instead, or provide a file/fd') + + if capture_output: + if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None: + raise ValueError('stdout and stderr arguments may not be used ' + 'with capture_output.') + + if kwargs.get('close_fds') is False: + raise ValueError( + 'close_fds=False is not supported by run_pipeline; ' + 'inherited pipe ends would prevent EOF signaling between commands') + + stderr_arg = kwargs.pop('stderr', None) + capture_stderr = capture_output or (stderr_arg is PIPE) + + stdin_arg = kwargs.pop('stdin', None) + stdout_arg = kwargs.pop('stdout', None) + + # Load-bearing: pop text=/universal_newlines=/encoding=/errors= so each + # Popen keeps its parent-side pipes binary. _communicate_streams_* relies + # on a bytes-in/bytes-out contract; leaving these in kwargs would wrap the + # pipes in TextIOWrapper and break the threaded Windows backend. + text = kwargs.pop('text', None) + universal_newlines = kwargs.pop('universal_newlines', None) + encoding = kwargs.pop('encoding', None) + errors_param = kwargs.pop('errors', None) + text_mode = bool(text or universal_newlines or encoding or errors_param) + if text_mode and encoding is None: + encoding = locale.getencoding() + + processes = [] + stderr_reader = None # File object for reading shared stderr (for parent) + stderr_write_fd = None # Write end of shared stderr pipe (for children) + + try: + # One shared stderr pipe across all children: lets stderr from any + # stage reach the parent through a single read end, which the I/O + # loop multiplexes alongside stdout. + if capture_stderr: + stderr_read_fd, stderr_write_fd = os.pipe() + stderr_reader = os.fdopen(stderr_read_fd, 'rb') + + for i, cmd in enumerate(commands): + is_first = (i == 0) + is_last = (i == len(commands) - 1) + + if is_first: + if input is not None: + proc_stdin = PIPE + else: + proc_stdin = stdin_arg # may be None, PIPE, fd, or file + else: + proc_stdin = processes[-1].stdout + + if is_last: + if capture_output: + proc_stdout = PIPE + else: + proc_stdout = stdout_arg # may be None, PIPE, fd, or file + else: + proc_stdout = PIPE + + if capture_stderr: + proc_stderr = stderr_write_fd + else: + proc_stderr = stderr_arg + + proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout, + stderr=proc_stderr, **kwargs) + processes.append(proc) + + # Close the parent's copy of the previous process's stdout + # to allow the pipe to signal EOF when the previous process exits + if not is_first and processes[-2].stdout is not None: + processes[-2].stdout.close() + + # The parent must drop its write end so children's writes are the + # only ones keeping the pipe open; otherwise the reader never + # sees EOF after all children exit. + if stderr_write_fd is not None: + os.close(stderr_write_fd) + stderr_write_fd = None + + first_proc = processes[0] + last_proc = processes[-1] + + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + + input_data = input + if input_data is not None and text_mode: + input_data = input_data.encode(encoding, errors_param or 'strict') + + read_streams = [] + if last_proc.stdout is not None: + read_streams.append(last_proc.stdout) + if stderr_reader is not None: + read_streams.append(stderr_reader) + + # Drive stdin, stdout, and stderr concurrently: any one of them + # filling its kernel pipe buffer would otherwise block a child + # whose progress depends on another stream draining. + stdin_stream = first_proc.stdin if input is not None else None + + try: + results = _communicate_streams( + stdin=stdin_stream, + input_data=input_data, + read_streams=read_streams, + timeout=_deadline_remaining(endtime), + cmd_for_timeout=commands, + stdout_stream=last_proc.stdout, + stderr_stream=stderr_reader, + ) + except TimeoutExpired: + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise + + stdout = results.get(last_proc.stdout) + stderr = results.get(stderr_reader) + + decode_errors = errors_param or 'strict' + if text_mode and stdout is not None: + stdout = _translate_newlines(stdout, encoding, decode_errors) + if text_mode and stderr is not None: + stderr = _translate_newlines(stderr, encoding, decode_errors) + + returncodes = [] + for proc in processes: + try: + remaining = _deadline_remaining(endtime) + proc.wait(timeout=remaining) + except TimeoutExpired: + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise TimeoutExpired(commands, timeout, stdout, stderr) + returncodes.append(proc.returncode) + + result = CompletedPipeline(commands, returncodes, stdout, stderr) + + if check and any(rc != 0 for rc in returncodes): + raise PipelineError(commands, returncodes, stdout, stderr) + + return result + + finally: + # Ensure all processes are cleaned up: kill all surviving children + # before waiting on any, so a hung wait() can't leave later + # children un-killed. + for proc in processes: + if proc.poll() is None: + proc.kill() + for proc in processes: + proc.wait() + for proc in processes: + if proc.stdin and not proc.stdin.closed: + proc.stdin.close() + if proc.stdout and not proc.stdout.closed: + proc.stdout.close() + # Close stderr pipe (reader is a file object, writer is a raw fd) + if stderr_reader is not None and not stderr_reader.closed: + try: + stderr_reader.close() + except OSError: + pass + if stderr_write_fd is not None: + try: + os.close(stderr_write_fd) + except OSError: + pass + + def list2cmdline(seq): """ Translate a sequence of arguments into a command line @@ -1149,8 +1757,8 @@ def universal_newlines(self, universal_newlines): self.text_mode = bool(universal_newlines) def _translate_newlines(self, data, encoding, errors): - data = data.decode(encoding, errors) - return data.replace("\r\n", "\n").replace("\r", "\n") + # Subclass-overridable hook; defers to the module-level helper. + return _translate_newlines(data, encoding, errors) def __enter__(self): return self @@ -2234,14 +2842,7 @@ def _communicate(self, input, endtime, orig_timeout): if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - try: - self.stdin.flush() - except BrokenPipeError: - pass # communicate() must ignore BrokenPipeError. - except ValueError: - # ignore ValueError: I/O operation on closed file. - if not self.stdin.closed: - raise + _flush_stdin(self.stdin) if not input: try: self.stdin.close() @@ -2266,11 +2867,8 @@ def _communicate(self, input, endtime, orig_timeout): self._save_input(input) - if self._input: - if not isinstance(self._input, memoryview): - input_view = memoryview(self._input) - else: - input_view = self._input.cast("b") # byte input required + input_view = _make_input_view(self._input) + input_offset = self._input_offset if self._input else 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: @@ -2280,41 +2878,29 @@ def _communicate(self, input, endtime, orig_timeout): if self.stderr and not self.stderr.closed: selector.register(self.stderr, selectors.EVENT_READ) - while selector.get_map(): - timeout = self._remaining_time(endtime) - if timeout is not None and timeout <= 0: - self._check_timeout(endtime, orig_timeout, - stdout, stderr, - skip_check_and_raise=True) - raise RuntimeError( # Impossible :) - '_check_timeout(..., skip_check_and_raise=True) ' - 'failed to raise TimeoutExpired.') - - ready = selector.select(timeout) - self._check_timeout(endtime, orig_timeout, stdout, stderr) - - # XXX Rewrite these to use non-blocking I/O on the file - # objects; they are no longer using C stdio! - - for key, events in ready: - if key.fileobj is self.stdin: - chunk = input_view[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() - else: - if self._input_offset >= len(input_view): - selector.unregister(key.fileobj) - key.fileobj.close() - elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fileobj) - key.fileobj.close() - self._fileobj2output[key.fileobj].append(data) + stdin_to_write = (self.stdin if self.stdin and self._input + and not self.stdin.closed else None) + # Persist the returned offset on self so a subsequent + # communicate() after a TimeoutExpired resumes mid-input + # rather than re-sending bytes the child already consumed. + new_offset, completed = _communicate_io_posix( + selector, + stdin_to_write, + input_view, + input_offset, + self._fileobj2output, + endtime, + close_on_eof=True) + if self._input: + self._input_offset = new_offset + + if not completed: + self._check_timeout(endtime, orig_timeout, stdout, stderr, + skip_check_and_raise=True) + raise RuntimeError( # Impossible :) + '_check_timeout(..., skip_check_and_raise=True) ' + 'failed to raise TimeoutExpired.') + try: self.wait(timeout=self._remaining_time(endtime)) except TimeoutExpired as exc: diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 0c5679611848ea..ef468775cf75dd 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1090,6 +1090,39 @@ def test_communicate_timeout_large_input(self): p.kill() p.wait() + def test_communicate_timeout_resume_partial_write(self): + """Resume writing input after a partial-write TimeoutExpired. + + Exercises the _input_offset bookkeeping across the + _communicate_io_posix factoring: a first communicate() must time out + mid-write, and a subsequent communicate() must finish delivering the + remaining bytes so the child receives the full input intact. + """ + # 1 MiB easily exceeds typical pipe buffers (~64 KiB) so writing + # blocks once the buffer fills before the child starts reading. + input_data = bytes(range(256)) * 4096 # 1 MiB, distinctive pattern + self.assertEqual(len(input_data), 1024 * 1024) + + p = subprocess.Popen( + [sys.executable, "-c", + "import sys, time; " + "time.sleep(0.5); " + "sys.stdout.buffer.write(sys.stdin.buffer.read())"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + with self.assertRaises(subprocess.TimeoutExpired): + p.communicate(input_data, timeout=0.05) + + # Resume: no new input, generous timeout to avoid CI flakes. + stdout, stderr = p.communicate(timeout=support.LONG_TIMEOUT) + self.assertEqual(len(stdout), len(input_data)) + self.assertEqual(stdout, input_data) + finally: + p.kill() + p.wait() + # Test for the fd leak reported in http://bugs.python.org/issue2791. def test_communicate_pipe_fd_leak(self): for stdin_pipe in (False, True): @@ -1986,6 +2019,679 @@ def test_encoding_warning(self): self.assertStartsWith(lines[1], b":3: EncodingWarning: ") +class PipelineTestCase(BaseTestCase): + """Tests for subprocess.run_pipeline()""" + + def test_pipeline_basic(self): + """Test basic two-command pipeline""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello world")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'HELLO WORLD') + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.returncode, 0) + + def test_pipeline_three_commands(self): + """Test pipeline with three commands""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("one\\ntwo\\nthree")'], + [sys.executable, '-c', 'import sys; print("".join(sorted(sys.stdin.readlines())))'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().strip().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'ONE\nTHREE\nTWO') + self.assertEqual(result.returncodes, [0, 0, 0]) + + def test_pipeline_with_input(self): + """Test pipeline with input data""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + input='hello', capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '5') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_memoryview_input(self): + """Test pipeline with memoryview input (byte elements)""" + test_data = b"Hello, memoryview pipeline!" + mv = memoryview(test_data) + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, test_data.upper()) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_memoryview_input_nonbyte(self): + """Test pipeline with non-byte memoryview input (e.g., int32). + + This tests the fix for gh-134453 where non-byte memoryviews + had incorrect length tracking on POSIX, causing data truncation. + """ + import array + # Create an array of 32-bit integers large enough to trigger + # chunked writing behavior (> PIPE_BUF) + pipe_buf = getattr(select, 'PIPE_BUF', 512) + # Each 'i' element is 4 bytes, need more than pipe_buf bytes total + num_elements = (pipe_buf // 4) + 100 + test_array = array.array('i', [0x41424344 for _ in range(num_elements)]) + expected_bytes = test_array.tobytes() + mv = memoryview(test_array) + + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; data = sys.stdin.buffer.read(); ' + 'sys.stdout.buffer.write(data)'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, expected_bytes, + msg=f"{len(result.stdout)=} != {len(expected_bytes)=}") + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_bytes_mode(self): + """Test pipeline in binary mode""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(b"hello")'], + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + capture_output=True + ) + self.assertEqual(result.stdout, b'HELLO') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_error_check(self): + """Test that check=True raises PipelineError on failure""" + with self.assertRaises(subprocess.PipelineError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(1)'], + capture_output=True, check=True + ) + exc = cm.exception + self.assertEqual(len(exc.failed), 1) + self.assertEqual(exc.failed[0][0], 1) # Second command failed + self.assertEqual(exc.returncodes, [0, 1]) + + def test_pipeline_first_command_fails(self): + """Test pipeline where first command fails""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(42)'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True + ) + self.assertEqual(result.returncodes[0], 42) + + def test_pipeline_requires_two_commands(self): + """Test that pipeline requires at least 2 commands""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + capture_output=True + ) + self.assertIn('at least 2 commands', str(cm.exception)) + + def test_pipeline_stdin_and_input_conflict(self): + """Test that stdin and input cannot both be specified""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + input='data', stdin=subprocess.PIPE + ) + self.assertIn('stdin', str(cm.exception)) + self.assertIn('input', str(cm.exception)) + + def test_pipeline_stdin_pipe_rejected(self): + """Test that stdin=PIPE is rejected (would hang)""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + stdin=subprocess.PIPE + ) + self.assertIn('stdin=PIPE', str(cm.exception)) + + def test_pipeline_capture_output_conflict(self): + """Test that capture_output conflicts with stdout/stderr""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + capture_output=True, stdout=subprocess.PIPE + ) + self.assertIn('capture_output', str(cm.exception)) + + def test_pipeline_close_fds_false_rejected(self): + """Test that close_fds=False is rejected (would deadlock)""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + close_fds=False + ) + self.assertIn('close_fds', str(cm.exception)) + + def test_pipeline_universal_newlines(self): + """Test that universal_newlines=True works like text=True""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, universal_newlines=True + ) + self.assertIsInstance(result.stdout, str) + self.assertIn('HELLO', result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_completed_repr(self): + """Test CompletedPipeline string representation""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("test")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, text=True + ) + repr_str = repr(result) + self.assertIn('CompletedPipeline', repr_str) + self.assertIn('commands=', repr_str) + self.assertIn('returncodes=', repr_str) + + def test_pipeline_check_returncodes_method(self): + """Test CompletedPipeline.check_returncodes() method""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(5)'], + capture_output=True + ) + with self.assertRaises(subprocess.PipelineError) as cm: + result.check_returncodes() + self.assertEqual(cm.exception.returncodes[1], 5) + + def test_pipeline_no_capture(self): + """Test pipeline without capturing output""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + ) + self.assertEqual(result.stdout, None) + self.assertEqual(result.stderr, None) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_stderr_capture(self): + """Test that stderr is captured from all processes""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print("err1", file=sys.stderr); print("out1")'], + [sys.executable, '-c', 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'], + capture_output=True, text=True + ) + self.assertIn('err1', result.stderr) + self.assertIn('err2', result.stderr) + + @unittest.skipIf(mswindows, "POSIX specific test") + def test_pipeline_timeout(self): + """Test pipeline with timeout""" + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + [sys.executable, '-c', 'import time; time.sleep(10); print("done")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, timeout=0.1 + ) + + @unittest.skipIf(mswindows, "POSIX specific test") + def test_pipeline_timeout_stdout_devnull_stderr_pipe(self): + """Timeout when stdout=DEVNULL but stderr=PIPE keeps streams distinct. + + Regression: TimeoutExpired.output used to be populated with stderr + bytes whenever stdout was not captured. + """ + try: + subprocess.run_pipeline( + [sys.executable, '-c', 'import time; time.sleep(10)'], + [sys.executable, '-c', 'import sys; sys.stdin.read()'], + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + timeout=0.1, + ) + except subprocess.TimeoutExpired as e: + self.assertIsNone(e.output) + self.assertIsInstance(e.stderr, bytes) + else: + self.fail("TimeoutExpired not raised") + + @unittest.skipUnless(mswindows, "Windows backend specific") + def test_pipeline_timeout_windows(self): + """run_pipeline timeout on the Windows threaded backend. + + The Windows _communicate_streams_windows path uses threads + instead of a selector. Exercise its timeout handling directly. + e.output may be partially captured by the reader threads or be + None depending on timing; we only assert it is a bytes object + or None. + """ + try: + subprocess.run_pipeline( + [sys.executable, '-c', + 'import time; time.sleep(10); print("done")'], + [sys.executable, '-c', + 'import sys; print(sys.stdin.read())'], + capture_output=True, timeout=0.5, + ) + except subprocess.TimeoutExpired as e: + self.assertTrue(e.output is None or isinstance(e.output, bytes)) + self.assertTrue(e.stderr is None or isinstance(e.stderr, bytes)) + else: + self.fail("TimeoutExpired not raised") + + @unittest.skipUnless(mswindows, "Windows backend specific") + def test_pipeline_timeout_windows_large_input(self): + """Windows writer-thread enforces timeout with large input to slow consumer. + + Mirror of test_pipeline_timeout_large_input that explicitly + exercises the Windows threaded backend's writer-thread timeout + path. The first process sleeps long before reading stdin, so + the writer thread has to honor the timeout rather than blocking + on the full pipe. + """ + # Input larger than typical pipe buffer (64KB). + input_data = 'x' * (128 * 1024) + + start = time.monotonic() + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + # First process sleeps before reading - simulates slow consumer. + [sys.executable, '-c', + 'import sys, time; time.sleep(30); print(sys.stdin.read())'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=input_data, capture_output=True, text=True, timeout=0.5, + ) + elapsed = time.monotonic() - start + + # Generous upper bound for slow Windows CI, but well under the + # 30s subprocess sleep so we know the timeout actually fired. + self.assertLess(elapsed, 10.0, + f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. " + "Input writing may have blocked without checking timeout.") + + def test_pipeline_error_str(self): + """Test PipelineError string representation""" + try: + subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(1)'], + [sys.executable, '-c', 'import sys; sys.exit(2)'], + capture_output=True, check=True + ) + except subprocess.PipelineError as e: + error_str = str(e) + self.assertIn('Pipeline failed', error_str) + + def test_pipeline_explicit_stdout_pipe(self): + """Test pipeline with explicit stdout=PIPE""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + stdout=subprocess.PIPE + ) + self.assertEqual(result.stdout.strip(), b'HELLO') + self.assertIsNone(result.stderr) + + def test_pipeline_stdin_from_file(self): + """Test pipeline with stdin from file""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write('file content\n') + f.flush() + fname = f.name + try: + with open(fname, 'r') as f: + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + stdin=f, capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '12') # "FILE CONTENT" + finally: + os.unlink(fname) + + def test_pipeline_stdout_to_devnull(self): + """Test pipeline with stdout to DEVNULL""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + stdout=subprocess.DEVNULL + ) + self.assertIsNone(result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_large_data_no_deadlock(self): + """Test that large data doesn't cause pipe buffer deadlock. + + This test verifies that the multiplexed I/O implementation properly + handles cases where pipe buffers would fill up. Without proper + multiplexing, this would deadlock because: + 1. First process outputs large data filling stdout pipe buffer + 2. Middle process reads some, processes, writes to its stdout + 3. If stdout pipe buffer fills, middle process blocks on write + 4. But first process is blocked waiting for middle to read more + 5. Classic deadlock + + The test uses data larger than typical pipe buffer size (64KB on Linux) + to ensure the multiplexed I/O is working correctly. + """ + # Generate data larger than typical pipe buffer (64KB) + # Use 256KB to ensure we exceed buffer on most systems + large_data = 'x' * (256 * 1024) + + # Pipeline: input -> double the data -> count chars + # The middle process outputs twice as much, increasing buffer pressure + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; data = sys.stdin.read(); print(data + data)'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read().strip()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + # Original data doubled = 512KB = 524288 chars + # Second process strips whitespace (removes trailing newline) then counts + expected_len = 256 * 1024 * 2 # doubled data, newline stripped + self.assertEqual(result.stdout.strip(), str(expected_len)) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_large_data_three_stages(self): + """Test large data through a three-stage pipeline. + + This is a more complex deadlock scenario with three processes, + where buffer pressure can occur at multiple points. + """ + # Use 128KB of data + large_data = 'y' * (128 * 1024) + + # Pipeline: input -> uppercase -> add prefix to each line -> count + # We use line-based processing to create more buffer churn + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', + 'import sys; print("".join("PREFIX:" + line for line in sys.stdin))'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.returncodes, [0, 0, 0]) + # Just verify we got a reasonable numeric output without deadlock + output_len = int(result.stdout.strip()) + self.assertGreater(output_len, len(large_data)) + + def test_pipeline_large_data_with_stderr(self): + """Test large data with large stderr output from multiple processes. + + Ensures stderr collection doesn't interfere with the main data flow + and doesn't cause deadlocks when multiple processes write large + amounts to stderr concurrently with stdin/stdout data flow. + """ + # 64KB of data through the pipeline + data_size = 64 * 1024 + large_data = 'z' * data_size + # Each process writes 64KB to stderr as well + stderr_size = 64 * 1024 + + result = subprocess.run_pipeline( + [sys.executable, '-c', f''' +import sys +# Write large stderr output +sys.stderr.write("E" * {stderr_size}) +sys.stderr.write("\\nstage1 done\\n") +# Pass through stdin to stdout +data = sys.stdin.read() +print(data) +'''], + [sys.executable, '-c', f''' +import sys +# Write large stderr output +sys.stderr.write("F" * {stderr_size}) +sys.stderr.write("\\nstage2 done\\n") +# Count input size +data = sys.stdin.read() +print(len(data.strip())) +'''], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.stdout.strip(), str(data_size)) + self.assertIn('stage1 done', result.stderr) + self.assertIn('stage2 done', result.stderr) + # > stderr_size (one stage's worth) confirms both stages' bytes + # survived multiplexing through the shared stderr pipe. + self.assertGreater(len(result.stderr), stderr_size) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_timeout_large_input(self): + """Test that timeout is enforced with large input to a slow pipeline. + + This verifies that run_pipeline() doesn't block indefinitely when + writing large input to a pipeline where the first process is slow + to consume stdin. The timeout should be enforced promptly. + + This is particularly important on Windows where stdin writing could + block without proper threading. + """ + # Input larger than typical pipe buffer (64KB) + input_data = 'x' * (128 * 1024) + + start = time.monotonic() + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + # First process sleeps before reading - simulates slow consumer + [sys.executable, '-c', + 'import sys, time; time.sleep(30); print(sys.stdin.read())'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=input_data, capture_output=True, text=True, timeout=0.5 + ) + elapsed = time.monotonic() - start + + # Timeout should occur close to the specified timeout value, + # not after waiting for the subprocess to finish sleeping. + # Allow generous margin for slow CI, but must be well under + # the subprocess sleep time. + self.assertLess(elapsed, 5.0, + f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. " + "Input writing may have blocked without checking timeout.") + + def test_pipeline_check_true_success(self): + """check=True with all-successful commands returns normally""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("ok")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().strip())'], + capture_output=True, text=True, check=True + ) + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.returncode, 0) + self.assertEqual(result.stdout.strip(), 'ok') + + def test_pipeline_stderr_to_stdout(self): + """stderr=STDOUT routes the final process's stderr to stdout""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("data")'], + [sys.executable, '-c', + 'import sys; sys.stdout.write(sys.stdin.read()); ' + 'sys.stderr.write("ERR\\n")'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + self.assertEqual(result.returncodes, [0, 0]) + self.assertIn(b'data', result.stdout) + self.assertIn(b'ERR', result.stdout) + self.assertIsNone(result.stderr) + + def test_pipeline_intermediate_stdout_closed_in_parent(self): + """Intermediate stdout pipes close in parent so producer sees EOF""" + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.write("x"); sys.stdout.flush(); ' + 'sys.stdout.write("y" * 200000)'], + [sys.executable, '-c', 'import sys; sys.stdin.read(1)'], + capture_output=True, timeout=10 + ) + self.assertEqual(result.returncodes[1], 0) + + def test_pipeline_error_pickle(self): + """PipelineError survives a pickle round-trip""" + import pickle + err = subprocess.PipelineError( + [['echo', 'hi'], ['false']], [0, 1], + stdout=b'hi\n', stderr=b'') + restored = pickle.loads(pickle.dumps(err)) + self.assertEqual(restored.commands, err.commands) + self.assertEqual(restored.returncodes, err.returncodes) + self.assertEqual(restored.stdout, err.stdout) + self.assertEqual(restored.stderr, err.stderr) + self.assertEqual(restored.failed, err.failed) + self.assertEqual(str(restored), str(err)) + + def test_pipeline_error_repr(self): + """repr(PipelineError(...)) is meaningful via Exception.args""" + err = subprocess.PipelineError( + [['echo', 'hi'], ['false']], [0, 1]) + r = repr(err) + self.assertIn('PipelineError', r) + self.assertIn('echo', r) + self.assertIn('false', r) + + @unittest.skipIf(mswindows, "POSIX shell-specific") + def test_pipeline_shell_true(self): + """shell=True forwards each command to the shell.""" + result = subprocess.run_pipeline( + 'echo hello world', + 'tr a-z A-Z', + shell=True, capture_output=True, text=True, + ) + self.assertEqual(result.stdout.strip(), 'HELLO WORLD') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_env(self): + """env= is propagated to every command in the pipeline.""" + env = os.environ.copy() + env['MY_TEST_VAR'] = 'pipeline_value' + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import os, sys; assert os.environ["MY_TEST_VAR"] == "pipeline_value"; ' + 'sys.stdout.write("first\\n")'], + [sys.executable, '-c', + 'import os, sys; assert os.environ["MY_TEST_VAR"] == "pipeline_value"; ' + 'sys.stdout.write(sys.stdin.read() + "second\\n")'], + env=env, capture_output=True, text=True, + ) + self.assertEqual(result.returncodes, [0, 0]) + self.assertIn('first', result.stdout) + self.assertIn('second', result.stdout) + + def test_pipeline_cwd(self): + """cwd= is propagated to every command in the pipeline.""" + with tempfile.TemporaryDirectory() as tmpdir: + expected = os.path.realpath(tmpdir) + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import os, sys; sys.stdout.write(os.getcwd() + "\\n")'], + [sys.executable, '-c', + 'import os, sys; sys.stdout.write(sys.stdin.read()); ' + 'sys.stdout.write(os.getcwd() + "\\n")'], + cwd=tmpdir, capture_output=True, text=True, + ) + lines = result.stdout.strip().split('\n') + self.assertEqual(len(lines), 2) + for line in lines: + self.assertEqual(os.path.realpath(line), expected) + self.assertEqual(result.returncodes, [0, 0]) + + @unittest.skipIf(mswindows, "pass_fds POSIX-specific") + def test_pipeline_pass_fds(self): + """pass_fds= forwards an inheritable fd to every command.""" + with tempfile.NamedTemporaryFile(mode='wb', delete=False) as f: + f.write(b'shared-content') + fname = f.name + try: + rfd = os.open(fname, os.O_RDONLY) + try: + result = subprocess.run_pipeline( + [sys.executable, '-c', + f'import os, sys; ' + f'data = os.pread({rfd}, 32, 0); ' + f'sys.stdout.write(data.decode() + "|")'], + [sys.executable, '-c', + f'import os, sys; ' + f'data = os.pread({rfd}, 32, 0); ' + f'sys.stdout.write(sys.stdin.read() + data.decode())'], + pass_fds=(rfd,), capture_output=True, text=True, + ) + finally: + os.close(rfd) + finally: + os.unlink(fname) + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.stdout, 'shared-content|shared-content') + + def test_pipeline_stderr_pipe_normal_completion(self): + """stderr=PIPE captures stderr without capture_output= on the success path.""" + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; print("err1", file=sys.stderr); print("out1")'], + [sys.executable, '-c', + 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'], + stderr=subprocess.PIPE, + ) + self.assertIsNone(result.stdout) + self.assertIsNotNone(result.stderr) + self.assertIn(b'err1', result.stderr) + self.assertIn(b'err2', result.stderr) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_errors_replace_multibyte_split(self): + """errors='replace' handles multi-byte stderr without raising.""" + result = subprocess.run_pipeline( + [sys.executable, '-c', + r'import sys; sys.stderr.buffer.write("é first ".encode()); ' + r'sys.stderr.flush(); ' + r'sys.stdout.write("data")'], + [sys.executable, '-c', + r'import sys; sys.stderr.buffer.write("中 second".encode()); ' + r'sys.stderr.flush(); ' + r'sys.stdout.write(sys.stdin.read())'], + capture_output=True, text=True, errors='replace', + ) + self.assertEqual(result.returncodes, [0, 0]) + self.assertIsInstance(result.stderr, str) + self.assertIn('first', result.stderr) + self.assertIn('second', result.stderr) + + def test_pipeline_middle_command_exits_early(self): + """Pipeline completes when a middle command exits without reading all input.""" + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys\n' + 'try:\n' + ' for i in range(100000):\n' + ' print(f"line{i}")\n' + 'except BrokenPipeError:\n' + ' pass\n'], + [sys.executable, '-c', + 'import sys\n' + 'print(sys.stdin.readline().strip())\n'], + [sys.executable, '-c', + 'import sys\n' + 'sys.stdout.write(sys.stdin.read())\n'], + capture_output=True, text=True, timeout=30, + ) + self.assertEqual(result.stdout.strip(), 'line0') + self.assertEqual(result.returncodes[1], 0) + self.assertEqual(result.returncodes[2], 0) + + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): if grp: diff --git a/Misc/NEWS.d/next/Library/2026-04-25-12-00-00.gh-issue-47798.pIpEln.rst b/Misc/NEWS.d/next/Library/2026-04-25-12-00-00.gh-issue-47798.pIpEln.rst new file mode 100644 index 00000000000000..6ae95ac8fb0c34 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-04-25-12-00-00.gh-issue-47798.pIpEln.rst @@ -0,0 +1,4 @@ +Added :func:`subprocess.run_pipeline` for running shell-style pipelines of +commands. The new :class:`subprocess.CompletedPipeline` and +:exc:`subprocess.PipelineError` types describe pipeline outcomes and check +failures, with ``check=True`` providing pipefail-style error handling.