utils.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import subprocess
  2. import time
  3. import logging
  4. import shlex
  5. class ShellUtils():
  6. def __init__(self, directory, outfile, errfile, logger=None):
  7. '''
  8. outfile: A file-like object to write command output to.
  9. Must have write(string) method. Common choices are
  10. files, sys.stdout, or WrapLogger objects
  11. errfile: See outfile
  12. logger : If provided, used instead of outfile/errfile for
  13. finer-grained logging
  14. '''
  15. # Advanced notes: outfile and errfile do *not* have to be
  16. # thread-safe objects. They are only ever written to from one
  17. # thread at a time *unless* someone calls sh_async twice with
  18. # the same ShellUtils
  19. self.directory = directory
  20. self.outfile = outfile
  21. self.errfile = errfile
  22. self.logger = logger
  23. def __write_out(self, message, level=logging.INFO, stream=None):
  24. if self.logger:
  25. self.logger.log(level, message)
  26. elif stream == None:
  27. self.outfile.write(message)
  28. else:
  29. stream.write(message)
  30. def __write_err(self, message, level=logging.ERROR):
  31. self.__write_out(message, level, stream=self.errfile)
  32. def sh(self, command, **kwargs):
  33. '''Run a shell command, sending output to outfile and errfile.
  34. Blocks until command exits'''
  35. kwargs.setdefault('cwd', self.directory)
  36. kwargs.setdefault('executable', '/bin/bash')
  37. self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
  38. try:
  39. output = subprocess.check_output(command, shell=True, stderr=self.errfile, **kwargs)
  40. if output and output.strip():
  41. self.__write_out("Output:")
  42. self.__write_out(output.rstrip('\n'))
  43. else:
  44. self.__write_out("No Output")
  45. except subprocess.CalledProcessError:
  46. self.__write_err("Command returned non-zero exit code: %s" % command)
  47. def sh_async(self, command, group=True, **kwargs):
  48. '''
  49. Run a shell command, continually sending output to outfile and errfile until
  50. shell process completes
  51. - If group is set, create a process group that can later be used to kill all subprocesses
  52. Returns the pid of the newly created process (or process group)
  53. '''
  54. # Setup extra args
  55. kwargs.setdefault('cwd', self.directory)
  56. if group:
  57. if self.os != 'nt':
  58. kwargs.setdefault('preexec_fn', os.setpgrp)
  59. else:
  60. # TODO if someone could make this work that would be great
  61. self.__write_err("Unable to group flag on Windows")
  62. self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
  63. # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline
  64. process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
  65. NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0])
  66. return process.pid
  67. def sh_pkill(self, group_id, name=None, usesudo=False):
  68. '''
  69. Kill processes that match all the passed arguments
  70. Set group_id if you used sh_async
  71. Set usesudo only if you started these processes with sudo
  72. # TODO - consider os.pgkill?
  73. '''
  74. command = "pkill "
  75. command = "%s -g %s" % (command, group_id)
  76. if name:
  77. command = "%s %s" % (command, name)
  78. if usesudo:
  79. command = "sudo %s" % command
  80. self.sh(command)
  81. from threading import Thread
  82. from Queue import Queue, Empty
  83. # TODO - no need to use a daemon, kill this off in stop!
  84. # NOTE - it is safe to use logging module in a multi-threaded
  85. # system, but not safe to log to the same file across multiple
  86. # processes. Our system has two main processes (main and __run_test),
  87. # and lots of minor ones from 'subprocess'. As long as we only use
  88. # one logger inside TestRunner and NonBlockingFoo, sd are good
  89. # Add credit for http://eyalarubas.com/python-subproc-nonblock.html
  90. class NonBlockingStreamReader:
  91. def __init__(self, stream):
  92. self._s = stream
  93. self._q = Queue()
  94. def _populateQueue(stream, queue):
  95. for line in iter(stream.readline, b''):
  96. queue.put(line)
  97. self._t = Thread(target = _populateQueue,
  98. args = (self._s, self._q))
  99. self._t.daemon = True
  100. self._t.start() #start collecting lines from the stream
  101. # TODO - This is only returning one line, if it is available.
  102. def readline(self, timeout = None):
  103. try:
  104. return self._q.get(block = timeout is not None,
  105. timeout = timeout)
  106. except Empty:
  107. return None
  108. def read(self):
  109. lines = []
  110. while True:
  111. line = self.readline(0.1)
  112. if not line:
  113. return lines
  114. lines.append(line)
  115. import logging
  116. from threading import Thread
  117. import threading
  118. # NonBlockingStreamLogger(p, logging.getLogger())
  119. # p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  120. class NonBlockingStreamLogger:
  121. '''Reads from a subprocess' streams and writes them to the
  122. provided logger as long as the subprocess is alive'''
  123. def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None):
  124. self.process = process
  125. self.logger = logger
  126. self.out_level = logout
  127. self.err_level = logerr
  128. if name:
  129. self.prefix = "Process '%s':" % name
  130. else:
  131. self.logger.warning("No name provided for process %s", process.pid)
  132. self.prefix = "Process '%s':" % process.pid
  133. name = process.pid
  134. outThread = Thread(target = self._readStream,
  135. args = (process.stdout, self.out_level),
  136. name = "%s - stdout" % name)
  137. outThread.daemon = True
  138. outThread.start()
  139. errThread = Thread(target = self._readStream,
  140. args = (process.stderr, self.err_level),
  141. name = "%s - stderr" % name)
  142. errThread.daemon = True
  143. errThread.start()
  144. def _readStream(self, stream, level):
  145. self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name)
  146. for line in iter(stream.readline, b''):
  147. self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n'))
  148. # Has process died?
  149. if self.process.poll() == 0:
  150. self.logger.debug("%s Death. Reading remainder of stream", self.prefix)
  151. remainder = stream.read()
  152. for line2 in remainder.split(b'\n'):
  153. self.logger.log(level, "%s %s", self.prefix, line2)
  154. break
  155. self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name)
  156. return 0
  157. import tempfile
  158. class WrapLogger():
  159. """
  160. Used to convert a Logger into a file-like object. Adds easy integration
  161. of Logger into subprocess, which takes file parameters for stdout
  162. and stderr.
  163. Use:
  164. (out, err) = WrapLogger(logger, logging.INFO), WrapLogger(logger, logging.ERROR)
  165. subprocess.Popen(command, stdout=out, stderr=err)
  166. Note: When used with subprocess, this cannot guarantee that output will appear
  167. in real time. This is because subprocess tends to bypass the write() method and
  168. access the underlying file directly. This will eventually collect any output
  169. that was sent directly to the file, but it cannot do this in real time.
  170. Practically, this limitation means that WrapLogger is safe to use with
  171. all synchronous subprocess calls, but it will lag heavily with
  172. subprocess.Popen calls
  173. """
  174. # Note - Someone awesome with python could make this fully implement the file
  175. # interface, and remove the real-time limitation
  176. def __init__(self, logger, level):
  177. self.logger = logger
  178. self.level = level
  179. self.file = tempfile.TemporaryFile()
  180. def write(self, message):
  181. self.logger.log(self.level, message)
  182. def __getattr__(self, name):
  183. return getattr(self.file, name)
  184. def __del__(self):
  185. """Grabs any output that was written directly to the file (e.g. bypassing
  186. the write method). Subprocess.call, Popen, etc have a habit of accessing
  187. the file directly for faster writing. See http://bugs.python.org/issue1631
  188. """
  189. self.file.seek(0)
  190. for line in self.file.readlines():
  191. self.logger.log(self.level, line.rstrip('\n'))
  192. class Header():
  193. """
  194. """
  195. def __init__(self, message, top='-', bottom='-'):
  196. self.message = message
  197. self.top = top
  198. self.bottom = bottom
  199. def __str__(self):
  200. topheader = self.top * 80
  201. topheader = topheader[:80]
  202. bottomheader = self.bottom * 80
  203. bottomheader = bottomheader[:80]
  204. return "\n%s\n %s\n%s" % (topheader, self.message, bottomheader)