Blame view
tools/patman/cros_subprocess.py
15.5 KB
71162e3ca
|
1 2 3 4 5 6 7 |
# Copyright (c) 2012 The Chromium OS Authors. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> # Licensed to PSF under a Contributor Agreement. # See http://www.python.org/2.4/license for licensing details. |
ab4a6aba7
|
8 |
"""Subprocess execution |
71162e3ca
|
9 10 11 |
This module holds a subclass of subprocess.Popen with our own required features, mainly that we get access to the subprocess output while it |
ab4a6aba7
|
12 |
is running rather than just at the end. This makes it easier to show |
71162e3ca
|
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
progress information and filter output in real time. """ import errno import os import pty import select import subprocess import sys import unittest # Import these here so the caller does not need to import subprocess also. PIPE = subprocess.PIPE STDOUT = subprocess.STDOUT PIPE_PTY = -3 # Pipe output through a pty stay_alive = True class Popen(subprocess.Popen): """Like subprocess.Popen with ptys and incremental output This class deals with running a child process and filtering its output on both stdout and stderr while it is running. We do this so we can monitor progress, and possibly relay the output to the user if requested. The class is similar to subprocess.Popen, the equivalent is something like: Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) But this class has many fewer features, and two enhancement: 1. Rather than getting the output data only at the end, this class sends it to a provided operation as it arrives. 2. We use pseudo terminals so that the child will hopefully flush its output to us as soon as it is produced, rather than waiting for the end of a line. Use CommunicateFilter() to handle output from the subprocess. """ def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, |
3b1c0b09c
|
56 |
shell=False, cwd=None, env=None, **kwargs): |
71162e3ca
|
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
"""Cut-down constructor Args: args: Program and arguments for subprocess to execute. stdin: See subprocess.Popen() stdout: See subprocess.Popen(), except that we support the sentinel value of cros_subprocess.PIPE_PTY. stderr: See subprocess.Popen(), except that we support the sentinel value of cros_subprocess.PIPE_PTY. shell: See subprocess.Popen() cwd: Working directory to change to for subprocess, or None if none. env: Environment to use for this subprocess, or None to inherit parent. kwargs: No other arguments are supported at the moment. Passing other arguments will cause a ValueError to be raised. """ stdout_pty = None stderr_pty = None if stdout == PIPE_PTY: stdout_pty = pty.openpty() stdout = os.fdopen(stdout_pty[1]) if stderr == PIPE_PTY: stderr_pty = pty.openpty() stderr = os.fdopen(stderr_pty[1]) super(Popen, self).__init__(args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, **kwargs) # If we're on a PTY, we passed the slave half of the PTY to the subprocess. # We want to use the master half on our end from now on. Setting this here # does make some assumptions about the implementation of subprocess, but # those assumptions are pretty minor. # Note that if stderr is STDOUT, then self.stderr will be set to None by # this constructor. if stdout_pty is not None: self.stdout = os.fdopen(stdout_pty[0]) if stderr_pty is not None: self.stderr = os.fdopen(stderr_pty[0]) # Insist that unit tests exist for other arguments we don't support. if kwargs: raise ValueError("Unit tests do not test extra args - please add tests") |
b1793a531
|
101 102 103 104 105 106 107 108 109 110 111 112 |
def ConvertData(self, data): """Convert stdout/stderr data to the correct format for output Args: data: Data to convert, or None for '' Returns: Converted data, as bytes """ if data is None: return b'' return data |
71162e3ca
|
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
def CommunicateFilter(self, output): """Interact with process: Read data from stdout and stderr. This method runs until end-of-file is reached, then waits for the subprocess to terminate. The output function is sent all output from the subprocess and must be defined like this: def Output([self,] stream, data) Args: stream: the stream the output was received on, which will be sys.stdout or sys.stderr. data: a string containing the data Note: The data read is buffered in memory, so do not use this method if the data size is large or unlimited. Args: output: Function to call with each fragment of output. Returns: A tuple (stdout, stderr, combined) which is the data received on stdout, stderr and the combined data (interleaved stdout and stderr). Note that the interleaved output will only be sensible if you have set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on the timing of the output in the subprocess. If a subprocess flips between stdout and stderr quickly in succession, by the time we come to read the output from each we may see several lines in each, and will read all the stdout lines, then all the stderr lines. So the interleaving may not be correct. In this case you might want to pass stderr=cros_subprocess.STDOUT to the constructor. This feature is still useful for subprocesses where stderr is rarely used and indicates an error. Note also that if you set stderr to STDOUT, then stderr will be empty and the combined output will just be the same as stdout. """ read_set = [] write_set = [] stdout = None # Return stderr = None # Return if self.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self.stdin.flush() if input: write_set.append(self.stdin) else: self.stdin.close() if self.stdout: read_set.append(self.stdout) |
b1793a531
|
169 |
stdout = b'' |
71162e3ca
|
170 171 |
if self.stderr and self.stderr != self.stdout: read_set.append(self.stderr) |
b1793a531
|
172 173 |
stderr = b'' combined = b'' |
71162e3ca
|
174 175 176 177 178 |
input_offset = 0 while read_set or write_set: try: rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) |
ac3fde939
|
179 |
except select.error as e: |
71162e3ca
|
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
if e.args[0] == errno.EINTR: continue raise if not stay_alive: self.terminate() if self.stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk # blocking. POSIX defines PIPE_BUF >= 512 chunk = input[input_offset : input_offset + 512] bytes_written = os.write(self.stdin.fileno(), chunk) input_offset += bytes_written if input_offset >= len(input): self.stdin.close() write_set.remove(self.stdin) if self.stdout in rlist: |
b1793a531
|
199 |
data = b'' |
71162e3ca
|
200 201 202 203 204 |
# We will get an error on read if the pty is closed try: data = os.read(self.stdout.fileno(), 1024) except OSError: pass |
b1793a531
|
205 |
if not len(data): |
71162e3ca
|
206 207 208 |
self.stdout.close() read_set.remove(self.stdout) else: |
b1793a531
|
209 210 |
stdout += data combined += data |
71162e3ca
|
211 212 213 |
if output: output(sys.stdout, data) if self.stderr in rlist: |
b1793a531
|
214 |
data = b'' |
71162e3ca
|
215 216 217 218 219 |
# We will get an error on read if the pty is closed try: data = os.read(self.stderr.fileno(), 1024) except OSError: pass |
b1793a531
|
220 |
if not len(data): |
71162e3ca
|
221 222 223 |
self.stderr.close() read_set.remove(self.stderr) else: |
b1793a531
|
224 225 |
stderr += data combined += data |
71162e3ca
|
226 227 228 229 |
if output: output(sys.stderr, data) # All data exchanged. Translate lists into strings. |
b1793a531
|
230 231 232 |
stdout = self.ConvertData(stdout) stderr = self.ConvertData(stderr) combined = self.ConvertData(combined) |
71162e3ca
|
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
# Translate newlines, if requested. We cannot let the file # object do the translation: It is based on stdio, which is # impossible to combine with select (unless forcing no # buffering). if self.universal_newlines and hasattr(file, 'newlines'): if stdout: stdout = self._translate_newlines(stdout) if stderr: stderr = self._translate_newlines(stderr) self.wait() return (stdout, stderr, combined) # Just being a unittest.TestCase gives us 14 public methods. Unless we # disable this, we can only have 6 tests in a TestCase. That's not enough. # # pylint: disable=R0904 class TestSubprocess(unittest.TestCase): """Our simple unit test for this module""" class MyOperation: """Provides a operation that we can pass to Popen""" def __init__(self, input_to_send=None): """Constructor to set up the operation and possible input. Args: input_to_send: a text string to send when we first get input. We will add \r to the string. """ self.stdout_data = '' self.stderr_data = '' self.combined_data = '' self.stdin_pipe = None self._input_to_send = input_to_send if input_to_send: pipe = os.pipe() self.stdin_read_pipe = pipe[0] self._stdin_write_pipe = os.fdopen(pipe[1], 'w') def Output(self, stream, data): """Output handler for Popen. Stores the data for later comparison""" if stream == sys.stdout: self.stdout_data += data if stream == sys.stderr: self.stderr_data += data self.combined_data += data # Output the input string if we have one. if self._input_to_send: self._stdin_write_pipe.write(self._input_to_send + '\r ') self._stdin_write_pipe.flush() def _BasicCheck(self, plist, oper): """Basic checks that the output looks sane.""" self.assertEqual(plist[0], oper.stdout_data) self.assertEqual(plist[1], oper.stderr_data) self.assertEqual(plist[2], oper.combined_data) # The total length of stdout and stderr should equal the combined length self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) def test_simple(self): """Simple redirection: Get process list""" oper = TestSubprocess.MyOperation() plist = Popen(['ps']).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) def test_stderr(self): """Check stdout and stderr""" oper = TestSubprocess.MyOperation() cmd = 'echo fred >/dev/stderr && false || echo bad' plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], 'bad\r ') self.assertEqual(plist [1], 'fred\r ') def test_shell(self): """Check with and without shell works""" oper = TestSubprocess.MyOperation() cmd = 'echo test >/dev/stderr' self.assertRaises(OSError, Popen, [cmd], shell=False) plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(len(plist [0]), 0) self.assertEqual(plist [1], 'test\r ') def test_list_args(self): """Check with and without shell works using list arguments""" oper = TestSubprocess.MyOperation() cmd = ['echo', 'test', '>/dev/stderr'] plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r ') self.assertEqual(len(plist [1]), 0) oper = TestSubprocess.MyOperation() # this should be interpreted as 'echo' with the other args dropped cmd = ['echo', 'test', '>/dev/stderr'] plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], '\r ') def test_cwd(self): """Check we can change directory""" for shell in (False, True): oper = TestSubprocess.MyOperation() plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], '/tmp\r ') def test_env(self): """Check we can change environment""" for add in (False, True): oper = TestSubprocess.MyOperation() env = os.environ if add: env ['FRED'] = 'fred' cmd = 'echo $FRED' plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], add and 'fred\r ' or '\r ') def test_extra_args(self): """Check we can't add extra arguments""" self.assertRaises(ValueError, Popen, 'true', close_fds=False) def test_basic_input(self): """Check that incremental input works We set up a subprocess which will prompt for name. When we see this prompt we send the name as input to the process. It should then print the name properly to stdout. """ oper = TestSubprocess.MyOperation('Flash') prompt = 'What is your name?: ' cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt plist = Popen([cmd], stdin=oper.stdin_read_pipe, shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(len(plist [1]), 0) self.assertEqual(plist [0], prompt + 'Hello Flash\r\r ') def test_isatty(self): """Check that ptys appear as terminals to the subprocess""" oper = TestSubprocess.MyOperation() cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' 'else echo "not %d" >&%d; fi;') both_cmds = '' for fd in (1, 2): both_cmds += cmd % (fd, fd, fd, fd, fd) plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], 'terminal 1\r ') self.assertEqual(plist [1], 'terminal 2\r ') # Now try with PIPE and make sure it is not a terminal oper = TestSubprocess.MyOperation() plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).CommunicateFilter(oper.Output) self._BasicCheck(plist, oper) self.assertEqual(plist [0], 'not 1 ') self.assertEqual(plist [1], 'not 2 ') if __name__ == '__main__': unittest.main() |