Browse Source

Use NonBlockingStreamReader to capture subprocess output

Hamilton Turner 10 years ago
parent
commit
efcbdf9bbb
2 changed files with 121 additions and 126 deletions
  1. 7 7
      toolset/benchmark/benchmarker.py
  2. 114 119
      toolset/benchmark/framework_test.py

+ 7 - 7
toolset/benchmark/benchmarker.py

@@ -563,7 +563,7 @@ class Benchmarker:
 
         if self.__is_port_bound(test.port):
           # This can happen sometimes - let's try again
-          self.__stop_test(test, out, err)
+          self.__stop_test(out, err)
           out.flush()
           err.flush()
           time.sleep(15)
@@ -577,7 +577,7 @@ class Benchmarker:
 
         result = test.start(out, err)
         if result != 0: 
-          self.__stop_test(test, out, err)
+          self.__stop_test(out, err)
           time.sleep(5)
           err.write( "ERROR: Problem starting {name}\n".format(name=test.name) )
           err.flush()
@@ -611,14 +611,14 @@ class Benchmarker:
         ##########################
         out.write(header("Stopping %s" % test.name))
         out.flush()
-        self.__stop_test(test, out, err)
+        self.__stop_test(out, err)
         out.flush()
         err.flush()
         time.sleep(15)
 
         if self.__is_port_bound(test.port):
           # This can happen sometimes - let's try again
-          self.__stop_test(test, out, err)
+          self.__stop_test(out, err)
           out.flush()
           err.flush()
           time.sleep(15)
@@ -650,7 +650,7 @@ class Benchmarker:
         traceback.print_exc(file=err)
         err.flush()
         try:
-          self.__stop_test(test, out, err)
+          self.__stop_test(out, err)
         except (subprocess.CalledProcessError) as e:
           self.__write_intermediate_results(test.name,"<setup.py>#stop() raised an error")
           err.write(header("Subprocess Error: Test .stop() raised exception %s" % test.name))
@@ -662,7 +662,7 @@ class Benchmarker:
       # TODO - subprocess should not catch this exception!
       # Parent process should catch it and cleanup/exit
       except (KeyboardInterrupt) as e:
-        self.__stop_test(test, out, err)
+        self.__stop_test(out, err)
         out.write(header("Cleaning up..."))
         out.flush()
         self.__finish()
@@ -680,7 +680,7 @@ class Benchmarker:
   # __stop_test(benchmarker)
   # Stops all running tests
   ############################################################
-  def __stop_test(self, test, out, err):
+  def __stop_test(self, out, err):
     try:
       subprocess.check_call('sudo killall -s 9 -u %s' % self.runner_user, shell=True, stderr=err, stdout=out)
       retcode = 0

+ 114 - 119
toolset/benchmark/framework_test.py

@@ -21,6 +21,49 @@ from threading import Event
 
 from utils import header
 
+from datetime import datetime
+from datetime import timedelta
+
+from threading import Thread
+from Queue import Queue, Empty
+
+class NonBlockingStreamReader:
+  def __init__(self, stream):
+    '''
+    stream: the stream to read from.
+            Usually a process' stdout or stderr.
+    '''
+
+    self._s = stream
+    self._q = Queue()
+
+    def _populateQueue(stream, queue):
+      '''
+      Collect lines from 'stream' and put them in 'quque'.
+      '''
+
+      while True:
+        line = stream.readline()
+        if line:
+          queue.put(line)
+        else:
+          raise UnexpectedEndOfStream
+
+    self._t = Thread(target = _populateQueue,
+            args = (self._s, self._q))
+    self._t.daemon = True
+    self._t.start() #start collecting lines from the stream
+
+  def readline(self, timeout = None):
+    try:
+      return self._q.get(block = timeout is not None,
+        timeout = timeout)
+    except Empty:
+      return None
+
+class UnexpectedEndOfStream(Exception): pass
+
+
 class FrameworkTest:
   headers_template = "-H 'Host: localhost' -H '{accept}' -H 'Connection: keep-alive'"
  
@@ -209,134 +252,86 @@ class FrameworkTest:
         command)
     logging.info("To run framework manually, copy/paste this:\n%s", debug_command)
 
-    '''
-    # Write the stderr to our temp.txt file to be read and fed back
-    # to the user via logging later.
-    with open('temp', 'w') as errout:
-
-
-      try:
-        subprocess.check_call(command, cwd=self.directory, 
-          shell=True, stderr=errout, stdout=out)
-        retcode = 0
-      except Exception:
-        logging.exception("Failure running setup.sh")
-        retcode = 1
-    
-    with open('temp', 'r') as errout:
-      # Read out temp error output in its entirety
-      body = errout.read()
-      if len(body) > 0:
-        # Log it to the user.
-        logging.error(body)
-        # Log it to our err.txt file
-        err.write(body)
-    # We are done with our temp file - delete it
-    os.remove('temp')
-    
-    '''
-
-    # Run setup.sh
-    # Printing output until subprocess terminates
-    class RunSetup(Thread):
-      def __init__(self, command, directory, stopFlag, setupFinished, name):
-        Thread.__init__(self)
-        self.command = command
-        self.directory = directory
-        self.stopFlag = stopFlag
-        self.setupFinished = setupFinished
-        self.setupResult = None
-        self.name = name
-        self.prefix = "Setup %s: " % self.name
-
-      def get_setup_result(self):
-        return self.setupResult
-
-      def _output(self, line):
-        # Log to current terminal
-        # Needs to be one atomic write, so we join because 
-        # list operations are faster than string concat
-        sys.stdout.write("".join([self.prefix, line]))
-        sys.stdout.flush()
-        # logging.error(body)
-
-      # Goal: Stream output of both benchmark toolset and 
-      # server to the console and to a file
-      # Problem: Capturing output of subprocess and children
-      # Solution: Use pipes provided by python
-      # Future-proof: Add unit tests that ensure this code works in all situations
-
-      def run(self):
-        # Run in setup.sh in background, using line buffered output and PIPEs
-        p = subprocess.Popen(self.command, cwd=self.directory, 
+    def tee_output(prefix, line):
+      # Log to current terminal
+      # Needs to be one atomic write, so we join because 
+      # list operations are faster than string concat
+      sys.stdout.write(u''.encode('utf-8').join([prefix, line]))
+      sys.stdout.flush()
+      # logging.error("".join([prefix, line]))
+
+    # Goal: Stream output of both benchmark toolset and 
+    # server to the console and to a file
+    # Problem: Capturing output of subprocess and children
+    # Solution: Use pipes provided by python
+    # Future-proof: Add unit tests that ensure this code works in all situations
+    # 
+    # https://blogs.gnome.org/markmc/2013/06/04/async-io-and-python/
+    # http://eyalarubas.com/python-subproc-nonblock.html
+
+    p = subprocess.Popen(command, cwd=self.directory, 
           shell=True, stdout=subprocess.PIPE, bufsize=1, 
           stderr=subprocess.STDOUT)
-
-        # Flush output until setup.sh process dies. Note that 
-        # the child processes forked using & will still be alive
-        # and directing their output to the pipes
-        while p.poll() is None:
-          line = p.stdout.readline()
-          if line:
-            self._output(line)
-          else:
-            time.sleep(0.5)
-
-        self._output("Exited with code %s\n" % p.returncode)
-        self.prefix = "Server %s: " % self.name
-        self.setupResult = p.returncode
-        self.setupFinished.set()
-            
-        # Setup.sh process has terminated, now we watch for output from
-        # child processes until the framework_test#stop is called. 
-        # (This also captures any remaining output that may have happened 
-        # between readline and poll)
-        while self.stopFlag.wait(0.5):
-          line = p.stdout.readline()
-          if line:
-            self._output(line)
-          
-        # Grab any remaining output
-        pout = p.communicate()[0]
-        for line in pout.splitlines():
-          self._output(line)
-        
-    '''
-    from threading import Event
-    from threading import Thread
-    import subprocess
-    import sys
-    import time
-
-    command = 'sudo -u testrunner -E -H bash -e setup.sh'
-    directory='/home/vagrant/FrameworkBenchmarks/frameworks/CSharp/evhttp-sharp'
-    stopFlag = Event()
-    setup_thread = RunSetup(command, directory, stopFlag, 'evhttp')
-    # setup_thread.start()
-    '''
-
-    self.stopFlag = Event()
-    setupFinished = Event()
-    setup_thread = RunSetup(command, self.directory, self.stopFlag, setupFinished, self.name)
-    setup_thread.start()
-    
-    # We can return once the port is bound or the setup process is dead
-    while not setupFinished.wait(5):
-      sys.stderr.write("Waiting for start to return...\n")
-
-    logging.info("Executed %s.sh, returning %s", self.setup_file, setup_thread.get_setup_result())
+    nbsr = NonBlockingStreamReader(p.stdout)
+
+    timeout = datetime.now() + timedelta(minutes = 10)
+    time_remaining = timeout - datetime.now()
+
+    # Flush output until setup.sh process is finished. This is 
+    # either a) when setup.sh exits b) when the port is bound
+    # c) when we run out of time
+    #
+    # Note: child processes forked using & will still be alive
+    # and directing their output to the pipes. E.g. even after 
+    # this loop dies the pipes are used to capture stdout/err from
+    # the running server
+    #
+    # Explicitly set our prefix encoding
+    prefix = (u"Setup %s: " % self.name).encode('utf-8')
+    while not (p.poll() 
+      or self.benchmarker.is_port_bound(self.port)
+      or time_remaining.total_seconds() < 0):
+      
+      line = nbsr.readline(0.1)
+      if line:
+        tee_output(prefix, line)
+      time_remaining = timeout - datetime.now()
+
+    # Were we timed out?
+    if time_remaining.total_seconds() < 0: 
+      print "Setup.sh timed out!!"  
+      p.kill()
+      return 1
+
+    # If setup.sh exited, use the return code
+    # Else return 0 if the port was bound
+    retcode = (p.poll() or 0 if self.benchmarker.is_port_bound(self.port) else 1)
+
+    # Before we return control to the benchmarker, spin up a 
+    # thread to keep an eye on the pipes in case the server 
+    # spits anything to stdout/stderr
+
+    # TODO add exit condition
+    def watch_child_pipes(nbsr, prefix):
+      while True: 
+        line = nbsr.readline(0.1)
+        if line:
+          tee_output(prefix, line)
+    prefix = (u"Server %s: " % self.name).encode('utf-8')
+    watch_thread = Thread(target = watch_child_pipes,
+      args = (nbsr, prefix))
+    watch_thread.daemon = True
+    watch_thread.start()
+
+    logging.info("Executed %s.sh, returning %s", self.setup_file, retcode)
 
     os.chdir(previousDir)
 
-    return setup_thread.get_setup_result()
+    return retcode
   ############################################################
   # End start
   ############################################################
 
-  def stop(self, out, err):
-    if self.stopFlag: 
-      self.stopFlag.set()
-
   ############################################################
   # verify_urls
   # Verifys each of the URLs for this test. THis will sinply