Browse Source

Code cleanup

Hamilton Turner 10 years ago
parent
commit
5c396b28ec
2 changed files with 142 additions and 102 deletions
  1. 87 101
      toolset/benchmark/framework_test.py
  2. 55 1
      toolset/setup/linux/setup_util.py

+ 87 - 101
toolset/benchmark/framework_test.py

@@ -24,46 +24,6 @@ from utils import header
 from datetime import datetime
 from datetime import timedelta
 
-from threading import Thread
-from Queue import Queue, Empty
-
-class NonBlockingStreamReader:
-  def __init__(self, stream, eof_message = None):
-    '''
-    stream: the stream to read from.
-            Usually a process' stdout or stderr.
-    '''
-
-    self._s = stream
-    self._q = Queue()
-    self._eof_message = eof_message
-
-    def _populateQueue(stream, queue):
-      '''
-      Collect lines from 'stream' and put them in 'queue'.
-      '''
-
-      while True:
-        line = stream.readline()
-        if line:
-          queue.put(line)
-        else:
-          if self._eof_message:
-            sys.stdout.write(self._eof_message + '\n')
-          return
-
-    self._t = Thread(target = _populateQueue,
-            args = (self._s, self._q))
-    self._t.daemon = True
-    self._t.start() #start collecting lines from the stream
-
-  def readline(self, timeout = None):
-    try:
-      return self._q.get(block = timeout is not None,
-        timeout = timeout)
-    except Empty:
-      return None
-
 class FrameworkTest:
   headers_template = "-H 'Host: localhost' -H '{accept}' -H 'Connection: keep-alive'"
  
@@ -204,22 +164,28 @@ class FrameworkTest:
   # Start the test using it's setup file
   ############################################################
   def start(self, out, err):
-    # Load profile for this installation
-    profile="$FWROOT/config/benchmark_profile"
 
-    # Setup variables for TROOT and IROOT
-    setup_util.replace_environ(config=profile, 
-              command='export TROOT=%s && export IROOT=%s && export DBHOST=%s && export MAX_THREADS=%s && export OUT=%s && export ERR=%s' %
-              (self.directory, self.install_root, self.database_host, self.benchmarker.threads, os.path.join(self.fwroot, out.name), os.path.join(self.fwroot, err.name)))
-
-    # Run the module start (inside parent of TROOT)
-    #     - we use the parent as a historical accident - a lot of tests
-    #       use subprocess's cwd argument already
+    # Setup environment variables
+    setup_util.replace_environ(config='$FWROOT/config/benchmark_profile', 
+              command='''\
+              export TROOT=%s       && \
+              export IROOT=%s       && \
+              export DBHOST=%s      && \
+              export MAX_THREADS=%s    \
+              ''' % (
+                self.directory, 
+                self.install_root, 
+                self.database_host, 
+                self.benchmarker.threads))
+
+    # Run the module start inside parent of TROOT
+    #  - we use the parent as a historical accident, a number of tests
+    # refer to their TROOT maually still
     previousDir = os.getcwd()
     os.chdir(os.path.dirname(self.troot))
     logging.info("Running setup module start (cwd=%s)", self.directory)
       
-    # Run the start script for the test as the "testrunner" user.
+    # Run the start script for the test as the "testrunner" user
     # 
     # `sudo` - Switching user requires superuser privs
     #   -u [username] The username
@@ -232,12 +198,16 @@ class FrameworkTest:
     #   -e Force bash to exit on first error
     #   -x Turn on bash tracing e.g. print commands before running
     #
-    # Most servers do not output to stdout/stderr while 
-    # serving requests so there is no performance hit from disabling 
-    # output buffering. Disabling is necessary to 
-    # a) allowing us to show output in real time b) avoiding lost 
-    # output in the buffer when the testrunner user is forcibly killed
+    # Most servers do not output to stdout/stderr while serving 
+    # requests so there is no performance hit from disabling 
+    # output buffering. This disabling is necessary to 
+    # a) allow TFB to show output in real time and b) avoid loosing 
+    # output in the buffer when the testrunner processes are forcibly 
+    # killed
+    # 
     # See http://www.pixelbeat.org/programming/stdio_buffering/
+    # See https://blogs.gnome.org/markmc/2013/06/04/async-io-and-python/
+    # See http://eyalarubas.com/python-subproc-nonblock.html
     command = 'sudo -u %s -E -H stdbuf -o0 -e0 bash -ex %s.sh' % (self.benchmarker.runner_user, self.setup_file)
     
     debug_command = '''\
@@ -258,90 +228,106 @@ class FrameworkTest:
         os.path.join(self.fwroot, err.name),
         self.directory,
         command)
-    logging.info("To run framework manually, copy/paste this:\n%s", debug_command)
+    logging.info("To run %s manually, copy/paste this:\n%s", self.name, debug_command)
+
 
     def tee_output(prefix, line):
+      # Needs to be one atomic write
+      # Explicitly use UTF-8 as it's the most common framework output 
+      # TODO improve encoding handling 
+      line = prefix.encode('utf-8') + line
+
       # Log to current terminal
-      # Needs to be one atomic write, so we join because 
-      # list operations are faster than string concat
-      sys.stdout.write(u''.encode('utf-8').join([prefix, line]))
+      sys.stdout.write(line)
       sys.stdout.flush()
       # logging.error("".join([prefix, line]))
 
-    # Goal: Stream output of both benchmark toolset and 
-    # server to the console and to a file
-    # Problem: Capturing output of subprocess and children
-    # Solution: Use pipes provided by python
-    # Future-proof: Add unit tests that ensure this code works in all situations
-    # 
-    # https://blogs.gnome.org/markmc/2013/06/04/async-io-and-python/
-    # http://eyalarubas.com/python-subproc-nonblock.html
+      out.write(line)
+      out.flush()
 
+    # Start the setup.sh command
     p = subprocess.Popen(command, cwd=self.directory, 
-          shell=True, stdout=subprocess.PIPE, bufsize=0, 
+          shell=True, stdout=subprocess.PIPE, 
           stderr=subprocess.STDOUT)
-    nbsr = NonBlockingStreamReader(p.stdout, "Processes for %s have terminated" % self.name)
+    nbsr = setup_util.NonBlockingStreamReader(p.stdout, 
+      "%s: Setup.sh and framework processes have terminated" % self.name)
 
+    # Setup a timeout
     timeout = datetime.now() + timedelta(minutes = 10)
     time_remaining = timeout - datetime.now()
 
-    # Flush output until setup.sh process is finished. This is 
+    # Flush output until setup.sh work is finished. This is 
     # either a) when setup.sh exits b) when the port is bound
-    # c) when we run out of time
+    # c) when we run out of time. Note that 'finished' doesn't 
+    # guarantee setup.sh process is dead - the OS may choose to make 
+    # setup.sh a zombie process if it still has living children
     #
-    # Note: child processes forked using & will still be alive
-    # and directing their output to the pipes. E.g. even after 
-    # this loop dies the pipes are used to capture stdout/err from
-    # the running server
+    # Note: child processes forked (using &) will remain alive 
+    # after setup.sh has exited. The will have inherited the 
+    # stdout/stderr descriptors and will be directing their 
+    # output to the pipes. 
     #
-    # Explicitly set our prefix encoding
-    prefix = (u"Setup %s: " % self.name).encode('utf-8')
+    prefix = "Setup %s: " % self.name
     while not (p.poll() 
       or self.benchmarker.is_port_bound(self.port)
       or time_remaining.total_seconds() < 0):
       
       # The conditions above are slow to check, so 
-      # we miss many lines of output if we only
-      # print one line per condition check. Adding a 
-      # tight loop here mitigates the effect
+      # we will delay output substantially if we only
+      # print one line per condition check. 
+      # Adding a tight loop here mitigates the effect, 
+      # ensuring that most of the output directly from 
+      # setup.sh is sent to tee_output before the outer
+      # loop exits and prints things like "setup.sh exited"
+      # 
       for i in xrange(10):
-        line = nbsr.readline(0.05)
-        if line:
-          tee_output(prefix, line)
+        try:
+          line = nbsr.readline(0.05)
+          if line:
+            tee_output(prefix, line)
+        except setup_util.EndOfStream:
+          tee_output(prefix, "Setup has terminated\n")
+          break
       time_remaining = timeout - datetime.now()
 
-    # Were we timed out?
+    # Did we time out?
     if time_remaining.total_seconds() < 0: 
-      print "Setup.sh timed out!!"  
+      tee_output(prefix, "Setup.sh timed out!! Aborting...\n")
       p.kill()
       return 1
 
-    # If setup.sh exited, use the return code
-    # Else return 0 if the port was bound
+    # What's our return code? 
+    # If setup.sh has terminated, use that code
+    # Otherwise, detect if the port was bound
     retcode = (p.poll() or 0 if self.benchmarker.is_port_bound(self.port) else 1)
     if p.poll():
-      print "Setup.sh exited with %s" % p.poll()
-    if self.benchmarker.is_port_bound(self.port):
-      print "Setup.sh exited due to bound port"
+      tee_output(prefix, "setup.sh process exited with %s\n" % p.poll())
+    elif self.benchmarker.is_port_bound(self.port):
+      tee_output(prefix, "setup.sh exited due to bound port\n")
 
     # Before we return control to the benchmarker, spin up a 
-    # thread to keep an eye on the pipes in case the server 
-    # spits anything to stdout/stderr
-
-    # TODO add exit condition
+    # thread to keep an eye on the pipes in case the running 
+    # framework uses stdout/stderr. Once all processes accessing
+    # the subprocess.PIPEs are dead, this thread will terminate. 
+    # Use a different prefix to indicate this is the framework 
+    # speaking
+    prefix = "Server %s: " % self.name
     def watch_child_pipes(nbsr, prefix):
-      while True: 
-        line = nbsr.readline(0.1)
-        if line:
-          tee_output(prefix, line)
-    prefix = (u"Server %s: " % self.name).encode('utf-8')
+      while True:
+        try:
+          line = nbsr.readline(60)
+          if line:
+            tee_output(prefix, line)
+        except setup_util.EndOfStream:
+          tee_output(prefix, "Framework processes have terminated\n")
+          return
+
     watch_thread = Thread(target = watch_child_pipes,
       args = (nbsr, prefix))
     watch_thread.daemon = True
     watch_thread.start()
 
     logging.info("Executed %s.sh, returning %s", self.setup_file, retcode)
-
     os.chdir(previousDir)
 
     return retcode

+ 55 - 1
toolset/setup/linux/setup_util.py

@@ -1,8 +1,62 @@
 import re
 import os
+import sys
 import subprocess
 import platform
 
+from threading import Thread
+from Queue import Queue, Empty
+
+class NonBlockingStreamReader:
+  '''
+  Enables calling readline in a non-blocking manner with a blocking stream, 
+  such as the ones returned from subprocess.Popen
+
+  Originally written by Eyal Arubas, who granted permission to use this inside TFB
+  See http://eyalarubas.com/python-subproc-nonblock.html
+  '''
+  def __init__(self, stream, eof_message = None):
+    '''
+    stream: the stream to read from.
+            Usually a process' stdout or stderr.
+    eof_message: A message to print to stdout as soon
+      as the stream's end is reached. Useful if you
+      want to track the exact moment a stream terminates
+    '''
+
+    self._s = stream
+    self._q = Queue()
+    self._eof_message = eof_message
+    self._poisonpill = 'MAGIC_POISONPILL_STRING'
+
+    def _populateQueue(stream, queue):
+      while True:
+        line = stream.readline()
+        if line: # 'data\n' or '\n'
+          queue.put(line)
+        else:    # '' e.g. EOF
+          if self._eof_message:
+            sys.stdout.write(self._eof_message + '\n')
+          queue.put(self._poisonpill)
+          return
+
+    self._t = Thread(target = _populateQueue,
+            args = (self._s, self._q))
+    self._t.daemon = True
+    self._t.start()
+
+  def readline(self, timeout = None):
+    try:
+      line = self._q.get(block = timeout is not None,
+        timeout = timeout)
+      if line == self._poisonpill: 
+        raise EndOfStream
+      return line
+    except Empty:
+      return None
+
+class EndOfStream(Exception): pass
+
 # Replaces all text found using the regular expression to_replace with the supplied replacement.
 def replace_text(file, to_replace, replacement):
     with open(file, "r") as conf:
@@ -29,7 +83,7 @@ def replace_environ(config=None, root=None, print_result=False, command='true'):
     
         # Clean up our current environment, preserving some important items
         mini_environ = {}
-        for envname in ['HOME', 'PATH', 'USER', 'LD_LIBRARY_PATH', 'PYTHONPATH', 'FWROOT', 'TRAVIS']:
+        for envname in ['HOME', 'PATH', 'LANG', 'USER', 'LD_LIBRARY_PATH', 'PYTHONPATH', 'FWROOT', 'TRAVIS']:
           if envname in os.environ:
             mini_environ[envname] = os.environ[envname]
         for key in os.environ: