Browse Source

Create NonBlockingStreamLogger class

Automatically handles directing a subprocess' stdout and stderr to a
logger
Hamilton Turner 11 years ago
parent
commit
8824dfa9da
1 changed files with 69 additions and 23 deletions
  1. 69 23
      toolset/benchmark/utils.py

+ 69 - 23
toolset/benchmark/utils.py

@@ -2,6 +2,8 @@
 import subprocess
 import subprocess
 import time
 import time
 import logging
 import logging
+import shlex
+
 class ShellUtils():
 class ShellUtils():
   def __init__(self, directory, outfile, errfile, logger=None):
   def __init__(self, directory, outfile, errfile, logger=None):
     '''
     '''
@@ -51,33 +53,34 @@ class ShellUtils():
   # TODO modify this to start the subcommand as a new process group, so that 
   # TODO modify this to start the subcommand as a new process group, so that 
   # we can automatically kill the entire group!
   # we can automatically kill the entire group!
   def sh_async(self, command, initial_logs=True, **kwargs):
   def sh_async(self, command, initial_logs=True, **kwargs):
-    '''Run a shell command, sending output to outfile and errfile.
-    If intial_logs, prints out logs for a few seconds before returning. '''
-    # TODO add this - '''Continues to send output until command completes'''
+    '''
+    Run a shell command, continually sending output to outfile and errfile until 
+    shell process completes
+    '''
     kwargs.setdefault('cwd', self.directory)
     kwargs.setdefault('cwd', self.directory)
     
     
-    # Open in line-buffered mode (bufsize=1) because NonBlockingStreamReader uses readline anyway
     self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
     self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
+    # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline
     process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
     process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
-    nbsr = NonBlockingStreamReader(process.stdout)
-    nbsr_err = NonBlockingStreamReader(process.stderr) 
-    if initial_logs:
-      time.sleep(8)
-      # TODO put this read into a tight loop to prevent deadlock due to 
-      # filling up OS buffers
-      out = nbsr.read()
-      if len(out) == 0:
-        self.outfile.write("No output")
-      else: 
-        self.outfile.write("Initial Output:")
-        for line in out:
-            self.outfile.write(line.rstrip('\n'))
-        
-      err = nbsr_err.read()
-      if len(err) != 0:
-        self.errfile.write("Initial Error Logs:")
-        for line in err:
-          self.errfile.write(line.rstrip('\n'))
+    NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0])
+    return process.pid
+
+  def sh_pkill(self, group_id, name=None, usesudo=False):
+    '''
+    Kill processes that match all the passed arguments
+    Set group_id if you used sh_async
+    Set usesudo only if you started these processes with sudo
+    # TODO - consider os.pgkill?
+    '''
+    command = "pkill "
+    command = "%s -g %s" % (command, group_id)
+
+    if name:
+      command = "%s %s" % (command, name)
+    if usesudo:
+      command = "sudo %s" % command
+    self.sh(command)
+
 
 
 from threading import Thread
 from threading import Thread
 from Queue import Queue, Empty
 from Queue import Queue, Empty
@@ -119,6 +122,49 @@ class NonBlockingStreamReader:
         return lines
         return lines
       lines.append(line)
       lines.append(line)
 
 
+import logging
+from threading import Thread
+import threading
+# NonBlockingStreamLogger(p, logging.getLogger())
+# p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
+class NonBlockingStreamLogger:
+  '''Reads from a subprocess' streams and writes them to the 
+  provided logger as long as the subprocess is alive'''
+  def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None):
+    self.process = process
+    self.logger = logger
+    self.out_level = logout
+    self.err_level = logerr
+    if name:
+      self.prefix = "Process '%s':" % name
+    else: 
+      self.logger.warning("No name provided for process %s", process.pid)
+      self.prefix = "Process '%s':" % process.pid
+      name = process.pid
+    outThread = Thread(target = self._readStream,
+                args = (process.stdout, self.out_level),
+                name = "%s - stdout" % name)
+    outThread.daemon = True
+    outThread.start()
+    errThread = Thread(target = self._readStream,
+                args = (process.stderr, self.err_level),
+                name = "%s - stderr" % name)
+    errThread.daemon = True
+    errThread.start()
+  def _readStream(self, stream, level):
+    self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name)
+    for line in iter(stream.readline, b''):
+      self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n'))
+      
+      # Has process died? 
+      if self.process.poll() == 0:
+        self.logger.debug("%s Death. Reading remainder of stream", self.prefix)
+        remainder = stream.read()
+        for line2 in remainder.split(b'\n'):
+          self.logger.log(level, "%s %s", self.prefix, line2)
+        break
+    self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name)
+    return 0
 import tempfile
 import tempfile
 class WrapLogger():
 class WrapLogger():
   """
   """