Blame view

tools/patman/cros_subprocess.py 15.5 KB
71162e3ca   Simon Glass   patman: Add cros_...
1
2
3
4
5
6
7
  # Copyright (c) 2012 The Chromium OS Authors.
  # Use of this source code is governed by a BSD-style license that can be
  # found in the LICENSE file.
  #
  # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
  # Licensed to PSF under a Contributor Agreement.
  # See http://www.python.org/2.4/license for licensing details.
ab4a6aba7   Anatolij Gustschin   patman: fix some ...
8
  """Subprocess execution
71162e3ca   Simon Glass   patman: Add cros_...
9
10
11
  
  This module holds a subclass of subprocess.Popen with our own required
  features, mainly that we get access to the subprocess output while it
ab4a6aba7   Anatolij Gustschin   patman: fix some ...
12
  is running rather than just at the end. This makes it easier to show
71162e3ca   Simon Glass   patman: Add cros_...
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  progress information and filter output in real time.
  """
  
  import errno
  import os
  import pty
  import select
  import subprocess
  import sys
  import unittest
  
  
  # Import these here so the caller does not need to import subprocess also.
  PIPE = subprocess.PIPE
  STDOUT = subprocess.STDOUT
  PIPE_PTY = -3     # Pipe output through a pty
  stay_alive = True
  
  
  class Popen(subprocess.Popen):
      """Like subprocess.Popen with ptys and incremental output
  
      This class deals with running a child process and filtering its output on
      both stdout and stderr while it is running. We do this so we can monitor
      progress, and possibly relay the output to the user if requested.
  
      The class is similar to subprocess.Popen, the equivalent is something like:
  
          Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  
      But this class has many fewer features, and two enhancement:
  
      1. Rather than getting the output data only at the end, this class sends it
           to a provided operation as it arrives.
      2. We use pseudo terminals so that the child will hopefully flush its output
           to us as soon as it is produced, rather than waiting for the end of a
           line.
  
      Use CommunicateFilter() to handle output from the subprocess.
  
      """
  
      def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
3b1c0b09c   Simon Glass   patman: Drop bina...
56
                   shell=False, cwd=None, env=None, **kwargs):
71162e3ca   Simon Glass   patman: Add cros_...
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
          """Cut-down constructor
  
          Args:
              args: Program and arguments for subprocess to execute.
              stdin: See subprocess.Popen()
              stdout: See subprocess.Popen(), except that we support the sentinel
                      value of cros_subprocess.PIPE_PTY.
              stderr: See subprocess.Popen(), except that we support the sentinel
                      value of cros_subprocess.PIPE_PTY.
              shell: See subprocess.Popen()
              cwd: Working directory to change to for subprocess, or None if none.
              env: Environment to use for this subprocess, or None to inherit parent.
              kwargs: No other arguments are supported at the moment.    Passing other
                      arguments will cause a ValueError to be raised.
          """
          stdout_pty = None
          stderr_pty = None
  
          if stdout == PIPE_PTY:
              stdout_pty = pty.openpty()
              stdout = os.fdopen(stdout_pty[1])
          if stderr == PIPE_PTY:
              stderr_pty = pty.openpty()
              stderr = os.fdopen(stderr_pty[1])
  
          super(Popen, self).__init__(args, stdin=stdin,
                  stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
                  **kwargs)
  
          # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
          # We want to use the master half on our end from now on.    Setting this here
          # does make some assumptions about the implementation of subprocess, but
          # those assumptions are pretty minor.
  
          # Note that if stderr is STDOUT, then self.stderr will be set to None by
          # this constructor.
          if stdout_pty is not None:
              self.stdout = os.fdopen(stdout_pty[0])
          if stderr_pty is not None:
              self.stderr = os.fdopen(stderr_pty[0])
  
          # Insist that unit tests exist for other arguments we don't support.
          if kwargs:
              raise ValueError("Unit tests do not test extra args - please add tests")
b1793a531   Simon Glass   patman: Update cr...
101
102
103
104
105
106
107
108
109
110
111
112
      def ConvertData(self, data):
          """Convert stdout/stderr data to the correct format for output
  
          Args:
              data: Data to convert, or None for ''
  
          Returns:
              Converted data, as bytes
          """
          if data is None:
              return b''
          return data
71162e3ca   Simon Glass   patman: Add cros_...
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
      def CommunicateFilter(self, output):
          """Interact with process: Read data from stdout and stderr.
  
          This method runs until end-of-file is reached, then waits for the
          subprocess to terminate.
  
          The output function is sent all output from the subprocess and must be
          defined like this:
  
              def Output([self,] stream, data)
              Args:
                  stream: the stream the output was received on, which will be
                          sys.stdout or sys.stderr.
                  data: a string containing the data
  
          Note: The data read is buffered in memory, so do not use this
          method if the data size is large or unlimited.
  
          Args:
              output: Function to call with each fragment of output.
  
          Returns:
              A tuple (stdout, stderr, combined) which is the data received on
              stdout, stderr and the combined data (interleaved stdout and stderr).
  
              Note that the interleaved output will only be sensible if you have
              set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
              the timing of the output in the subprocess. If a subprocess flips
              between stdout and stderr quickly in succession, by the time we come to
              read the output from each we may see several lines in each, and will read
              all the stdout lines, then all the stderr lines. So the interleaving
              may not be correct. In this case you might want to pass
              stderr=cros_subprocess.STDOUT to the constructor.
  
              This feature is still useful for subprocesses where stderr is
              rarely used and indicates an error.
  
              Note also that if you set stderr to STDOUT, then stderr will be empty
              and the combined output will just be the same as stdout.
          """
  
          read_set = []
          write_set = []
          stdout = None # Return
          stderr = None # Return
  
          if self.stdin:
              # Flush stdio buffer.    This might block, if the user has
              # been writing to .stdin in an uncontrolled fashion.
              self.stdin.flush()
              if input:
                  write_set.append(self.stdin)
              else:
                  self.stdin.close()
          if self.stdout:
              read_set.append(self.stdout)
b1793a531   Simon Glass   patman: Update cr...
169
              stdout = b''
71162e3ca   Simon Glass   patman: Add cros_...
170
171
          if self.stderr and self.stderr != self.stdout:
              read_set.append(self.stderr)
b1793a531   Simon Glass   patman: Update cr...
172
173
              stderr = b''
          combined = b''
71162e3ca   Simon Glass   patman: Add cros_...
174
175
176
177
178
  
          input_offset = 0
          while read_set or write_set:
              try:
                  rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
ac3fde939   Paul Burton   patman: Make exce...
179
              except select.error as e:
71162e3ca   Simon Glass   patman: Add cros_...
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
                  if e.args[0] == errno.EINTR:
                      continue
                  raise
  
              if not stay_alive:
                      self.terminate()
  
              if self.stdin in wlist:
                  # When select has indicated that the file is writable,
                  # we can write up to PIPE_BUF bytes without risk
                  # blocking.    POSIX defines PIPE_BUF >= 512
                  chunk = input[input_offset : input_offset + 512]
                  bytes_written = os.write(self.stdin.fileno(), chunk)
                  input_offset += bytes_written
                  if input_offset >= len(input):
                      self.stdin.close()
                      write_set.remove(self.stdin)
  
              if self.stdout in rlist:
b1793a531   Simon Glass   patman: Update cr...
199
                  data = b''
71162e3ca   Simon Glass   patman: Add cros_...
200
201
202
203
204
                  # We will get an error on read if the pty is closed
                  try:
                      data = os.read(self.stdout.fileno(), 1024)
                  except OSError:
                      pass
b1793a531   Simon Glass   patman: Update cr...
205
                  if not len(data):
71162e3ca   Simon Glass   patman: Add cros_...
206
207
208
                      self.stdout.close()
                      read_set.remove(self.stdout)
                  else:
b1793a531   Simon Glass   patman: Update cr...
209
210
                      stdout += data
                      combined += data
71162e3ca   Simon Glass   patman: Add cros_...
211
212
213
                      if output:
                          output(sys.stdout, data)
              if self.stderr in rlist:
b1793a531   Simon Glass   patman: Update cr...
214
                  data = b''
71162e3ca   Simon Glass   patman: Add cros_...
215
216
217
218
219
                  # We will get an error on read if the pty is closed
                  try:
                      data = os.read(self.stderr.fileno(), 1024)
                  except OSError:
                      pass
b1793a531   Simon Glass   patman: Update cr...
220
                  if not len(data):
71162e3ca   Simon Glass   patman: Add cros_...
221
222
223
                      self.stderr.close()
                      read_set.remove(self.stderr)
                  else:
b1793a531   Simon Glass   patman: Update cr...
224
225
                      stderr += data
                      combined += data
71162e3ca   Simon Glass   patman: Add cros_...
226
227
228
229
                      if output:
                          output(sys.stderr, data)
  
          # All data exchanged.    Translate lists into strings.
b1793a531   Simon Glass   patman: Update cr...
230
231
232
          stdout = self.ConvertData(stdout)
          stderr = self.ConvertData(stderr)
          combined = self.ConvertData(combined)
71162e3ca   Simon Glass   patman: Add cros_...
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
  
          # Translate newlines, if requested.    We cannot let the file
          # object do the translation: It is based on stdio, which is
          # impossible to combine with select (unless forcing no
          # buffering).
          if self.universal_newlines and hasattr(file, 'newlines'):
              if stdout:
                  stdout = self._translate_newlines(stdout)
              if stderr:
                  stderr = self._translate_newlines(stderr)
  
          self.wait()
          return (stdout, stderr, combined)
  
  
  # Just being a unittest.TestCase gives us 14 public methods.    Unless we
  # disable this, we can only have 6 tests in a TestCase.    That's not enough.
  #
  # pylint: disable=R0904
  
  class TestSubprocess(unittest.TestCase):
      """Our simple unit test for this module"""
  
      class MyOperation:
          """Provides a operation that we can pass to Popen"""
          def __init__(self, input_to_send=None):
              """Constructor to set up the operation and possible input.
  
              Args:
                  input_to_send: a text string to send when we first get input. We will
                      add \r
   to the string.
              """
              self.stdout_data = ''
              self.stderr_data = ''
              self.combined_data = ''
              self.stdin_pipe = None
              self._input_to_send = input_to_send
              if input_to_send:
                  pipe = os.pipe()
                  self.stdin_read_pipe = pipe[0]
                  self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
  
          def Output(self, stream, data):
              """Output handler for Popen. Stores the data for later comparison"""
              if stream == sys.stdout:
                  self.stdout_data += data
              if stream == sys.stderr:
                  self.stderr_data += data
              self.combined_data += data
  
              # Output the input string if we have one.
              if self._input_to_send:
                  self._stdin_write_pipe.write(self._input_to_send + '\r
  ')
                  self._stdin_write_pipe.flush()
  
      def _BasicCheck(self, plist, oper):
          """Basic checks that the output looks sane."""
          self.assertEqual(plist[0], oper.stdout_data)
          self.assertEqual(plist[1], oper.stderr_data)
          self.assertEqual(plist[2], oper.combined_data)
  
          # The total length of stdout and stderr should equal the combined length
          self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
  
      def test_simple(self):
          """Simple redirection: Get process list"""
          oper = TestSubprocess.MyOperation()
          plist = Popen(['ps']).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
  
      def test_stderr(self):
          """Check stdout and stderr"""
          oper = TestSubprocess.MyOperation()
          cmd = 'echo fred >/dev/stderr && false || echo bad'
          plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(plist [0], 'bad\r
  ')
          self.assertEqual(plist [1], 'fred\r
  ')
  
      def test_shell(self):
          """Check with and without shell works"""
          oper = TestSubprocess.MyOperation()
          cmd = 'echo test >/dev/stderr'
          self.assertRaises(OSError, Popen, [cmd], shell=False)
          plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(len(plist [0]), 0)
          self.assertEqual(plist [1], 'test\r
  ')
  
      def test_list_args(self):
          """Check with and without shell works using list arguments"""
          oper = TestSubprocess.MyOperation()
          cmd = ['echo', 'test', '>/dev/stderr']
          plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r
  ')
          self.assertEqual(len(plist [1]), 0)
  
          oper = TestSubprocess.MyOperation()
  
          # this should be interpreted as 'echo' with the other args dropped
          cmd = ['echo', 'test', '>/dev/stderr']
          plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(plist [0], '\r
  ')
  
      def test_cwd(self):
          """Check we can change directory"""
          for shell in (False, True):
              oper = TestSubprocess.MyOperation()
              plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
              self._BasicCheck(plist, oper)
              self.assertEqual(plist [0], '/tmp\r
  ')
  
      def test_env(self):
          """Check we can change environment"""
          for add in (False, True):
              oper = TestSubprocess.MyOperation()
              env = os.environ
              if add:
                  env ['FRED'] = 'fred'
              cmd = 'echo $FRED'
              plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
              self._BasicCheck(plist, oper)
              self.assertEqual(plist [0], add and 'fred\r
  ' or '\r
  ')
  
      def test_extra_args(self):
          """Check we can't add extra arguments"""
          self.assertRaises(ValueError, Popen, 'true', close_fds=False)
  
      def test_basic_input(self):
          """Check that incremental input works
  
          We set up a subprocess which will prompt for name. When we see this prompt
          we send the name as input to the process. It should then print the name
          properly to stdout.
          """
          oper = TestSubprocess.MyOperation('Flash')
          prompt = 'What is your name?: '
          cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
          plist = Popen([cmd], stdin=oper.stdin_read_pipe,
                  shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(len(plist [1]), 0)
          self.assertEqual(plist [0], prompt + 'Hello Flash\r\r
  ')
  
      def test_isatty(self):
          """Check that ptys appear as terminals to the subprocess"""
          oper = TestSubprocess.MyOperation()
          cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
                  'else echo "not %d" >&%d; fi;')
          both_cmds = ''
          for fd in (1, 2):
              both_cmds += cmd % (fd, fd, fd, fd, fd)
          plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(plist [0], 'terminal 1\r
  ')
          self.assertEqual(plist [1], 'terminal 2\r
  ')
  
          # Now try with PIPE and make sure it is not a terminal
          oper = TestSubprocess.MyOperation()
          plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                  shell=True).CommunicateFilter(oper.Output)
          self._BasicCheck(plist, oper)
          self.assertEqual(plist [0], 'not 1
  ')
          self.assertEqual(plist [1], 'not 2
  ')
  
  if __name__ == '__main__':
      unittest.main()