utils.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import subprocess
  2. import time
  3. import os
  4. import logging
  5. import shlex
  6. class ShellUtils():
  7. def __init__(self, directory, outfile, errfile, logger=None):
  8. '''
  9. outfile: A file-like object to write command output to.
  10. Must have write(string) method. Common choices are
  11. files, sys.stdout, or WrapLogger objects
  12. errfile: See outfile
  13. logger : If provided, used instead of outfile/errfile for
  14. finer-grained logging
  15. '''
  16. # Advanced notes: outfile and errfile do *not* have to be
  17. # thread-safe objects. They are only ever written to from one
  18. # thread at a time *unless* someone calls sh_async twice with
  19. # the same ShellUtils
  20. self.directory = directory
  21. self.outfile = outfile
  22. self.errfile = errfile
  23. self.logger = logger
  24. self.os = os.name
  25. def __write_out(self, message, level=logging.INFO, stream=None):
  26. if self.logger:
  27. self.logger.log(level, message)
  28. elif stream == None:
  29. self.outfile.write(message)
  30. else:
  31. stream.write(message)
  32. def __write_err(self, message, level=logging.ERROR):
  33. self.__write_out(message, level, stream=self.errfile)
  34. def sh(self, command, **kwargs):
  35. '''Run a shell command, sending output to outfile and errfile.
  36. Blocks until command exits'''
  37. kwargs.setdefault('cwd', self.directory)
  38. kwargs.setdefault('executable', '/bin/bash')
  39. self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
  40. try:
  41. output = subprocess.check_output(command, shell=True, stderr=self.errfile, **kwargs)
  42. if output and output.strip():
  43. self.__write_out("Output:")
  44. self.__write_out(output.rstrip('\n'))
  45. else:
  46. self.__write_out("No Output")
  47. except subprocess.CalledProcessError:
  48. self.__write_err("Command returned non-zero exit code: %s" % command)
  49. def sh_async(self, command, group=True, **kwargs):
  50. '''
  51. Run a shell command, continually sending output to outfile and errfile until
  52. shell process completes
  53. - If group is set, create a process group that can later be used to kill all subprocesses
  54. Returns the pid of the newly created process (or process group)
  55. '''
  56. # Setup extra args
  57. kwargs.setdefault('cwd', self.directory)
  58. if group:
  59. if self.os != 'nt':
  60. kwargs.setdefault('preexec_fn', os.setpgrp)
  61. else:
  62. # TODO if someone could make this work that would be great
  63. self.__write_err("Unable to group flag on Windows")
  64. self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
  65. # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline
  66. process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
  67. NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0])
  68. return process.pid
  69. def sh_pkill(self, group_id, name=None, usesudo=False):
  70. '''
  71. Kill processes that match all the passed arguments
  72. Set group_id if you used sh_async
  73. Set usesudo only if you started these processes with sudo
  74. # TODO - consider os.pgkill?
  75. '''
  76. command = "pkill "
  77. command = "%s -g %s" % (command, group_id)
  78. if name:
  79. command = "%s %s" % (command, name)
  80. if usesudo:
  81. command = "sudo %s" % command
  82. self.sh(command)
  83. from threading import Thread
  84. from Queue import Queue, Empty
  85. # TODO - no need to use a daemon, kill this off in stop!
  86. # NOTE - it is safe to use logging module in a multi-threaded
  87. # system, but not safe to log to the same file across multiple
  88. # processes. Our system has two main processes (main and __run_test),
  89. # and lots of minor ones from 'subprocess'. As long as we only use
  90. # one logger inside TestRunner and NonBlockingFoo, sd are good
  91. # Add credit for http://eyalarubas.com/python-subproc-nonblock.html
  92. class NonBlockingStreamReader:
  93. def __init__(self, stream):
  94. self._s = stream
  95. self._q = Queue()
  96. def _populateQueue(stream, queue):
  97. for line in iter(stream.readline, b''):
  98. queue.put(line)
  99. self._t = Thread(target = _populateQueue,
  100. args = (self._s, self._q))
  101. self._t.daemon = True
  102. self._t.start() #start collecting lines from the stream
  103. # TODO - This is only returning one line, if it is available.
  104. def readline(self, timeout = None):
  105. try:
  106. return self._q.get(block = timeout is not None,
  107. timeout = timeout)
  108. except Empty:
  109. return None
  110. def read(self):
  111. lines = []
  112. while True:
  113. line = self.readline(0.1)
  114. if not line:
  115. return lines
  116. lines.append(line)
  117. import logging
  118. from threading import Thread
  119. import threading
  120. # NonBlockingStreamLogger(p, logging.getLogger())
  121. # p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  122. class NonBlockingStreamLogger:
  123. '''Reads from a subprocess' streams and writes them to the
  124. provided logger as long as the subprocess is alive'''
  125. def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None):
  126. self.process = process
  127. self.logger = logger
  128. self.out_level = logout
  129. self.err_level = logerr
  130. if name:
  131. self.prefix = "Process '%s':" % name
  132. else:
  133. self.logger.warning("No name provided for process %s", process.pid)
  134. self.prefix = "Process '%s':" % process.pid
  135. name = process.pid
  136. outThread = Thread(target = self._readStream,
  137. args = (process.stdout, self.out_level),
  138. name = "%s - stdout" % name)
  139. outThread.daemon = True
  140. outThread.start()
  141. errThread = Thread(target = self._readStream,
  142. args = (process.stderr, self.err_level),
  143. name = "%s - stderr" % name)
  144. errThread.daemon = True
  145. errThread.start()
  146. def _readStream(self, stream, level):
  147. self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name)
  148. for line in iter(stream.readline, b''):
  149. self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n'))
  150. # Has process died?
  151. if self.process.poll() == 0:
  152. self.logger.debug("%s Death. Reading remainder of stream", self.prefix)
  153. remainder = stream.read()
  154. for line2 in remainder.split(b'\n'):
  155. self.logger.log(level, "%s %s", self.prefix, line2)
  156. break
  157. self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name)
  158. return 0
  159. import tempfile
  160. class WrapLogger():
  161. """
  162. Used to convert a Logger into a file-like object. Adds easy integration
  163. of Logger into subprocess, which takes file parameters for stdout
  164. and stderr.
  165. Use:
  166. (out, err) = WrapLogger(logger, logging.INFO), WrapLogger(logger, logging.ERROR)
  167. subprocess.Popen(command, stdout=out, stderr=err)
  168. Note: When used with subprocess, this cannot guarantee that output will appear
  169. in real time. This is because subprocess tends to bypass the write() method and
  170. access the underlying file directly. This will eventually collect any output
  171. that was sent directly to the file, but it cannot do this in real time.
  172. Practically, this limitation means that WrapLogger is safe to use with
  173. all synchronous subprocess calls, but it will lag heavily with
  174. subprocess.Popen calls
  175. """
  176. # Note - Someone awesome with python could make this fully implement the file
  177. # interface, and remove the real-time limitation
  178. def __init__(self, logger, level):
  179. self.logger = logger
  180. self.level = level
  181. self.file = tempfile.TemporaryFile()
  182. def write(self, message):
  183. self.logger.log(self.level, message)
  184. def __getattr__(self, name):
  185. return getattr(self.file, name)
  186. def __del__(self):
  187. """Grabs any output that was written directly to the file (e.g. bypassing
  188. the write method). Subprocess.call, Popen, etc have a habit of accessing
  189. the file directly for faster writing. See http://bugs.python.org/issue1631
  190. """
  191. self.file.seek(0)
  192. for line in self.file.readlines():
  193. self.logger.log(self.level, line.rstrip('\n'))
  194. class Header():
  195. """
  196. """
  197. def __init__(self, message, top='-', bottom='-'):
  198. self.message = message
  199. self.top = top
  200. self.bottom = bottom
  201. def __str__(self):
  202. topheader = self.top * 80
  203. topheader = topheader[:80]
  204. bottomheader = self.bottom * 80
  205. bottomheader = bottomheader[:80]
  206. return "\n%s\n %s\n%s" % (topheader, self.message, bottomheader)