Skip to content

Commit 5964b74

Browse files
gpsheadclaude
andcommitted
gh-47798: Tighten run_pipeline comments to standing invariants
Drop section-label and restate-the-code comments added with run_pipeline and its helpers, and reframe the remaining ones around the invariant they document (pipe-EOF on parent close, drain-writer-before-readers, multiplexing prevents buffer-fill deadlocks, _input_offset persists for resume) so future readers get the why, not a narration of the code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e65e55d commit 5964b74

2 files changed

Lines changed: 20 additions & 44 deletions

File tree

Lib/subprocess.py

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,6 @@ def _cleanup():
323323
DEVNULL = -3
324324

325325

326-
# Helper function for multiplexed I/O
327326
def _deadline_remaining(endtime):
328327
"""Calculate remaining time until deadline."""
329328
if endtime is None:
@@ -410,7 +409,6 @@ def _communicate_io_posix(selector, stdin, input_view, input_offset,
410409

411410
for key, events in ready:
412411
if key.fd == stdin_fd:
413-
# Write chunk to stdin
414412
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
415413
try:
416414
input_offset += os.write(key.fd, chunk)
@@ -428,7 +426,6 @@ def _communicate_io_posix(selector, stdin, input_view, input_offset,
428426
except BrokenPipeError:
429427
pass
430428
elif key.fileobj in output_buffers:
431-
# Read chunk from output stream
432429
data = os.read(key.fd, 32768)
433430
if not data:
434431
selector.unregister(key.fileobj)
@@ -524,15 +521,13 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
524521
writer_thread = None
525522
writer_result = []
526523

527-
# Start writer thread to send input to stdin
528524
if stdin and input_data:
529525
writer_thread = threading.Thread(
530526
target=_writer_thread_func,
531527
args=(stdin, input_data, writer_result))
532528
writer_thread.daemon = True
533529
writer_thread.start()
534530
elif stdin:
535-
# No input data, just close stdin
536531
try:
537532
stdin.close()
538533
except BrokenPipeError:
@@ -541,7 +536,6 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
541536
if exc.errno != errno.EINVAL:
542537
raise
543538

544-
# Start reader threads for each stream
545539
for stream in read_streams:
546540
buf = []
547541
buffers[stream] = buf
@@ -557,51 +551,44 @@ def _raise_timeout():
557551
output=results.get(stdout_stream),
558552
stderr=results.get(stderr_stream))
559553

560-
# Join writer thread with timeout first
554+
# Drain the writer before any reader so a stalled write surfaces as
555+
# the timeout source, not a partial read.
561556
if writer_thread is not None:
562557
remaining = _deadline_remaining(endtime)
563558
if remaining is not None and remaining < 0:
564559
remaining = 0
565560
writer_thread.join(remaining)
566561
if writer_thread.is_alive():
567-
# Timed out during write - collect partial results
568562
_raise_timeout()
569-
# Check for write errors
570563
if writer_result:
571564
raise writer_result[0]
572565

573-
# Join reader threads with timeout
574566
for stream, t in threads:
575567
remaining = _deadline_remaining(endtime)
576568
if remaining is not None and remaining < 0:
577569
remaining = 0
578570
t.join(remaining)
579571
if t.is_alive():
580-
# Collect partial results
581572
_raise_timeout()
582573

583-
# Collect results
584574
return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()}
585575

586576
else:
587577
def _communicate_streams_posix(stdin, input_data, read_streams,
588578
endtime, orig_timeout, cmd_for_timeout,
589579
stdout_stream=None, stderr_stream=None):
590580
"""POSIX implementation using selectors."""
591-
# Build output buffers for each stream
592581
output_buffers = {stream: [] for stream in read_streams}
593582

594-
# Prepare stdin
595583
if stdin:
596584
_flush_stdin(stdin)
597585
if not input_data:
598586
try:
599587
stdin.close()
600588
except BrokenPipeError:
601589
pass
602-
stdin = None # Don't register with selector
590+
stdin = None # don't register with selector
603591

604-
# Prepare input data
605592
input_view = _make_input_view(input_data)
606593

607594
with _PopenSelector() as selector:
@@ -610,20 +597,17 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
610597
for stream in read_streams:
611598
selector.register(stream, selectors.EVENT_READ)
612599

613-
# Run the common I/O loop
614600
_, completed = _communicate_io_posix(
615601
selector, stdin, input_view, 0, output_buffers, endtime)
616602

617603
if not completed:
618-
# Timed out - collect partial results
619604
results = {stream: b''.join(chunks)
620605
for stream, chunks in output_buffers.items()}
621606
raise TimeoutExpired(
622607
cmd_for_timeout, orig_timeout,
623608
output=results.get(stdout_stream),
624609
stderr=results.get(stderr_stream))
625610

626-
# Build results and close all file objects
627611
results = {}
628612
for stream, chunks in output_buffers.items():
629613
results[stream] = b''.join(chunks)
@@ -1018,7 +1002,6 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
10181002
if len(commands) < 2:
10191003
raise ValueError('run_pipeline requires at least 2 commands')
10201004

1021-
# Validate no conflicting arguments
10221005
if input is not None and kwargs.get('stdin') is not None:
10231006
raise ValueError('stdin and input arguments may not both be used.')
10241007
if kwargs.get('stdin') is PIPE:
@@ -1035,12 +1018,9 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
10351018
'close_fds=False is not supported by run_pipeline; '
10361019
'inherited pipe ends would prevent EOF signaling between commands')
10371020

1038-
# Determine stderr handling - all processes share the same stderr pipe
1039-
# When capturing, we create one pipe and all processes write to it
10401021
stderr_arg = kwargs.pop('stderr', None)
10411022
capture_stderr = capture_output or (stderr_arg is PIPE)
10421023

1043-
# stdin is for the first process, stdout is for the last process
10441024
stdin_arg = kwargs.pop('stdin', None)
10451025
stdout_arg = kwargs.pop('stdout', None)
10461026

@@ -1061,7 +1041,9 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
10611041
stderr_write_fd = None # Write end of shared stderr pipe (for children)
10621042

10631043
try:
1064-
# Create a single stderr pipe that all processes will share
1044+
# One shared stderr pipe across all children: lets stderr from any
1045+
# stage reach the parent through a single read end, which the I/O
1046+
# loop multiplexes alongside stdout.
10651047
if capture_stderr:
10661048
stderr_read_fd, stderr_write_fd = os.pipe()
10671049
stderr_reader = os.fdopen(stderr_read_fd, 'rb')
@@ -1070,25 +1052,22 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
10701052
is_first = (i == 0)
10711053
is_last = (i == len(commands) - 1)
10721054

1073-
# Determine stdin for this process
10741055
if is_first:
10751056
if input is not None:
10761057
proc_stdin = PIPE
10771058
else:
1078-
proc_stdin = stdin_arg # Could be None, PIPE, fd, or file
1059+
proc_stdin = stdin_arg # may be None, PIPE, fd, or file
10791060
else:
10801061
proc_stdin = processes[-1].stdout
10811062

1082-
# Determine stdout for this process
10831063
if is_last:
10841064
if capture_output:
10851065
proc_stdout = PIPE
10861066
else:
1087-
proc_stdout = stdout_arg # Could be None, PIPE, fd, or file
1067+
proc_stdout = stdout_arg # may be None, PIPE, fd, or file
10881068
else:
10891069
proc_stdout = PIPE
10901070

1091-
# All processes share the same stderr pipe (write end)
10921071
if capture_stderr:
10931072
proc_stderr = stderr_write_fd
10941073
else:
@@ -1103,34 +1082,34 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
11031082
if not is_first and processes[-2].stdout is not None:
11041083
processes[-2].stdout.close()
11051084

1106-
# Close the write end of stderr pipe in parent - children have it
1085+
# The parent must drop its write end so children's writes are the
1086+
# only ones keeping the pipe open; otherwise the reader never
1087+
# sees EOF after all children exit.
11071088
if stderr_write_fd is not None:
11081089
os.close(stderr_write_fd)
11091090
stderr_write_fd = None
11101091

11111092
first_proc = processes[0]
11121093
last_proc = processes[-1]
11131094

1114-
# Calculate deadline for timeout (used throughout)
11151095
if timeout is not None:
11161096
endtime = _time() + timeout
11171097
else:
11181098
endtime = None
11191099

1120-
# Encode input if in text mode (text_mode/encoding resolved above).
11211100
input_data = input
11221101
if input_data is not None and text_mode:
11231102
input_data = input_data.encode(encoding, errors_param or 'strict')
11241103

1125-
# Build list of streams to read from
11261104
read_streams = []
11271105
if last_proc.stdout is not None:
11281106
read_streams.append(last_proc.stdout)
11291107
if stderr_reader is not None:
11301108
read_streams.append(stderr_reader)
11311109

1132-
# Use multiplexed I/O to handle stdin/stdout/stderr concurrently
1133-
# This avoids deadlocks from pipe buffer limits
1110+
# Drive stdin, stdout, and stderr concurrently: any one of them
1111+
# filling its kernel pipe buffer would otherwise block a child
1112+
# whose progress depends on another stream draining.
11341113
stdin_stream = first_proc.stdin if input is not None else None
11351114

11361115
try:
@@ -1144,33 +1123,28 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
11441123
stderr_stream=stderr_reader,
11451124
)
11461125
except TimeoutExpired:
1147-
# Kill all processes on timeout
11481126
for p in processes:
11491127
if p.poll() is None:
11501128
p.kill()
11511129
for p in processes:
11521130
p.wait()
11531131
raise
11541132

1155-
# Extract results
11561133
stdout = results.get(last_proc.stdout)
11571134
stderr = results.get(stderr_reader)
11581135

1159-
# Translate newlines if in text mode (decode and convert \r\n to \n)
11601136
decode_errors = errors_param or 'strict'
11611137
if text_mode and stdout is not None:
11621138
stdout = _translate_newlines(stdout, encoding, decode_errors)
11631139
if text_mode and stderr is not None:
11641140
stderr = _translate_newlines(stderr, encoding, decode_errors)
11651141

1166-
# Wait for all processes to complete (use remaining time from deadline)
11671142
returncodes = []
11681143
for proc in processes:
11691144
try:
11701145
remaining = _deadline_remaining(endtime)
11711146
proc.wait(timeout=remaining)
11721147
except TimeoutExpired:
1173-
# Kill all processes on timeout
11741148
for p in processes:
11751149
if p.poll() is None:
11761150
p.kill()
@@ -1783,7 +1757,7 @@ def universal_newlines(self, universal_newlines):
17831757
self.text_mode = bool(universal_newlines)
17841758

17851759
def _translate_newlines(self, data, encoding, errors):
1786-
# Method kept for subclass back-compat; logic lives at module level.
1760+
# Subclass-overridable hook; defers to the module-level helper.
17871761
return _translate_newlines(data, encoding, errors)
17881762

17891763
def __enter__(self):
@@ -2904,9 +2878,11 @@ def _communicate(self, input, endtime, orig_timeout):
29042878
if self.stderr and not self.stderr.closed:
29052879
selector.register(self.stderr, selectors.EVENT_READ)
29062880

2907-
# Use the common I/O loop (supports resume via _input_offset)
29082881
stdin_to_write = (self.stdin if self.stdin and self._input
29092882
and not self.stdin.closed else None)
2883+
# Persist the returned offset on self so a subsequent
2884+
# communicate() after a TimeoutExpired resumes mid-input
2885+
# rather than re-sending bytes the child already consumed.
29102886
new_offset, completed = _communicate_io_posix(
29112887
selector,
29122888
stdin_to_write,

Lib/test/test_subprocess.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,10 +2464,10 @@ def test_pipeline_large_data_with_stderr(self):
24642464
)
24652465

24662466
self.assertEqual(result.stdout.strip(), str(data_size))
2467-
# Verify both processes wrote to stderr
24682467
self.assertIn('stage1 done', result.stderr)
24692468
self.assertIn('stage2 done', result.stderr)
2470-
# Verify large stderr was captured (at least most of it)
2469+
# > stderr_size (one stage's worth) confirms both stages' bytes
2470+
# survived multiplexing through the shared stderr pipe.
24712471
self.assertGreater(len(result.stderr), stderr_size)
24722472
self.assertEqual(result.returncodes, [0, 0])
24732473

0 commit comments

Comments
 (0)