Browse Source

Merge pull request #1505 from hamiltont/improve-log-handling

Improve framework server log handling
Brittany Mazza 10 years ago
parent
commit
7ebb3123f2

+ 6 - 1
toolset/benchmark/benchmarker.py

@@ -383,8 +383,10 @@ class Benchmarker:
       sudo sysctl -w kernel.shmmax=2147483648
       sudo sysctl -w kernel.shmmax=2147483648
       sudo sysctl -w kernel.shmall=2097152
       sudo sysctl -w kernel.shmall=2097152
       sudo sysctl -w kernel.sem="250 32000 256 512"
       sudo sysctl -w kernel.sem="250 32000 256 512"
-      echo "Printing kernel configuration:" && sudo sysctl -a
     """)
     """)
+    # TODO - print kernel configuration to file
+    # echo "Printing kernel configuration:" && sudo sysctl -a
+
         # Explanations:
         # Explanations:
         # net.ipv4.tcp_max_syn_backlog, net.core.somaxconn, kernel.sched_autogroup_enabled: http://tweaked.io/guide/kernel/
         # net.ipv4.tcp_max_syn_backlog, net.core.somaxconn, kernel.sched_autogroup_enabled: http://tweaked.io/guide/kernel/
         # ulimit -n: http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
         # ulimit -n: http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
@@ -692,6 +694,9 @@ class Benchmarker:
   # End __stop_test
   # End __stop_test
   ############################################################
   ############################################################
 
 
+  def is_port_bound(self, port):
+    return self.__is_port_bound(port)
+
   ############################################################
   ############################################################
   # __is_port_bound
   # __is_port_bound
   # Check if the requested port is available. If it
   # Check if the requested port is available. If it

+ 158 - 81
toolset/benchmark/framework_test.py

@@ -21,6 +21,9 @@ from threading import Event
 
 
 from utils import header
 from utils import header
 
 
+from datetime import datetime
+from datetime import timedelta
+
 class FrameworkTest:
 class FrameworkTest:
   headers_template = "-H 'Host: localhost' -H '{accept}' -H 'Connection: keep-alive'"
   headers_template = "-H 'Host: localhost' -H '{accept}' -H 'Connection: keep-alive'"
  
  
@@ -161,94 +164,168 @@ class FrameworkTest:
   # Start the test using it's setup file
   # Start the test using it's setup file
   ############################################################
   ############################################################
   def start(self, out, err):
   def start(self, out, err):
-    # Load profile for this installation
-    profile="$FWROOT/config/benchmark_profile"
-
-    # Setup variables for TROOT and IROOT
-    setup_util.replace_environ(config=profile, 
-              command='export TROOT=%s && export IROOT=%s && export DBHOST=%s && export MAX_THREADS=%s && export OUT=%s && export ERR=%s' %
-              (self.directory, self.install_root, self.database_host, self.benchmarker.threads, os.path.join(self.fwroot, out.name), os.path.join(self.fwroot, err.name)))
-
-    # Because start can take so long, we print a dot to let the user know 
-    # we are working
-    class ProgressPrinterThread(Thread):
-      def __init__(self, event):
-          Thread.__init__(self)
-          self.stopped = event
-
-      def run(self):
-        while not self.stopped.wait(20):
-          sys.stderr.write("Waiting for start to return...\n")
-    stopFlag = Event()
-    thread = ProgressPrinterThread(stopFlag)
-    thread.start()
-
-    # Run the module start (inside parent of TROOT)
-    #     - we use the parent as a historical accident - a lot of tests
-    #       use subprocess's cwd argument already
+
+    # Setup environment variables
+    setup_util.replace_environ(config='$FWROOT/config/benchmark_profile', 
+              command='''\
+              export TROOT=%s       && \
+              export IROOT=%s       && \
+              export DBHOST=%s      && \
+              export MAX_THREADS=%s    \
+              ''' % (
+                self.directory, 
+                self.install_root, 
+                self.database_host, 
+                self.benchmarker.threads))
+
+    # Run the module start inside parent of TROOT
+    #  - we use the parent as a historical accident, a number of tests
+    # refer to their TROOT maually still
     previousDir = os.getcwd()
     previousDir = os.getcwd()
     os.chdir(os.path.dirname(self.troot))
     os.chdir(os.path.dirname(self.troot))
     logging.info("Running setup module start (cwd=%s)", self.directory)
     logging.info("Running setup module start (cwd=%s)", self.directory)
       
       
-    # Write the stderr to our temp.txt file to be read and fed back
-    # to the user via logging later.
-    with open('temp', 'w') as errout:
-      # Run the start script for the test as the "testrunner" user.
-      # This requires superuser privs, so `sudo` is necessary.
-      #   -u [username] The username
-      #   -E Preserves the current environment variables
-      #   -H Forces the home var (~) to be reset to the user specified
-      #   -e Force bash to exit on first error
-      # Note: check_call is a blocking call, so any startup scripts
-      # run by the framework that need to continue (read: server has
-      # started and needs to remain that way), then they should be
-      # executed in the background.
-      command = 'sudo -u %s -E -H bash -e %s.sh' % (self.benchmarker.runner_user, self.setup_file)
-      
-      debug_command = '''\
-        export FWROOT=%s && \\
-        export TROOT=%s && \\
-        export IROOT=%s && \\
-        export DBHOST=%s && \\
-        export MAX_THREADS=%s && \\
-        export OUT=%s && \\
-        export ERR=%s && \\
-        cd %s && \\
-        %s''' % (self.fwroot, 
-          self.directory, 
-          self.install_root, 
-          self.database_host, 
-          self.benchmarker.threads, 
-          os.path.join(self.fwroot, out.name), 
-          os.path.join(self.fwroot, err.name),
-          self.directory,
-          command)
-      logging.info("To run framework manually, copy/paste this:\n%s", debug_command)
+    # Run the start script for the test as the "testrunner" user
+    # 
+    # `sudo` - Switching user requires superuser privs
+    #   -u [username] The username
+    #   -E Preserves the current environment variables
+    #   -H Forces the home var (~) to be reset to the user specified
+    # `stdbuf` - Disable buffering, send output to python ASAP
+    #   -o0 zero-sized buffer for stdout
+    #   -e0 zero-sized buffer for stderr
+    # `bash` - Run the setup.sh script using bash
+    #   -e Force bash to exit on first error
+    #   -x Turn on bash tracing e.g. print commands before running
+    #
+    # Most servers do not output to stdout/stderr while serving 
+    # requests so there is no performance hit from disabling 
+    # output buffering. This disabling is necessary to 
+    # a) allow TFB to show output in real time and b) avoid loosing 
+    # output in the buffer when the testrunner processes are forcibly 
+    # killed
+    # 
+    # See http://www.pixelbeat.org/programming/stdio_buffering/
+    # See https://blogs.gnome.org/markmc/2013/06/04/async-io-and-python/
+    # See http://eyalarubas.com/python-subproc-nonblock.html
+    command = 'sudo -u %s -E -H stdbuf -o0 -e0 bash -ex %s.sh' % (self.benchmarker.runner_user, self.setup_file)
+    
+    debug_command = '''\
+      export FWROOT=%s && \\
+      export TROOT=%s && \\
+      export IROOT=%s && \\
+      export DBHOST=%s && \\
+      export MAX_THREADS=%s && \\
+      cd %s && \\
+      %s''' % (self.fwroot, 
+        self.directory, 
+        self.install_root, 
+        self.database_host, 
+        self.benchmarker.threads, 
+        self.directory,
+        command)
+    logging.info("To run %s manually, copy/paste this:\n%s", self.name, debug_command)
+
+
+    def tee_output(prefix, line):
+      # Needs to be one atomic write
+      # Explicitly use UTF-8 as it's the most common framework output 
+      # TODO improve encoding handling 
+      line = prefix.encode('utf-8') + line
+
+      # Log to current terminal
+      sys.stdout.write(line)
+      sys.stdout.flush()
+      # logging.error("".join([prefix, line]))
+
+      out.write(line)
+      out.flush()
 
 
-      try:
-        subprocess.check_call(command, cwd=self.directory, 
-          shell=True, stderr=errout, stdout=out)
-        retcode = 0
-      except Exception:
-        logging.exception("Failure running setup.sh")
-        retcode = 1
-    with open('temp', 'r') as errout:
-      # Read out temp error output in its entirety
-      body = errout.read()
-      if len(body) > 0:
-        # Log it to the user.
-        logging.error(body)
-        # Log it to our err.txt file
-        err.write(body)
-    # We are done with our temp file - delete it
-    os.remove('temp')
+    # Start the setup.sh command
+    p = subprocess.Popen(command, cwd=self.directory, 
+          shell=True, stdout=subprocess.PIPE, 
+          stderr=subprocess.STDOUT)
+    nbsr = setup_util.NonBlockingStreamReader(p.stdout, 
+      "%s: %s.sh and framework processes have terminated" % (self.name, self.setup_file))
+
+    # Setup a timeout
+    timeout = datetime.now() + timedelta(minutes = 20)
+    time_remaining = timeout - datetime.now()
+
+    # Flush output until setup.sh work is finished. This is 
+    # either a) when setup.sh exits b) when the port is bound
+    # c) when we run out of time. Note that 'finished' doesn't 
+    # guarantee setup.sh process is dead - the OS may choose to make 
+    # setup.sh a zombie process if it still has living children
+    #
+    # Note: child processes forked (using &) will remain alive 
+    # after setup.sh has exited. The will have inherited the 
+    # stdout/stderr descriptors and will be directing their 
+    # output to the pipes. 
+    #
+    prefix = "Setup %s: " % self.name
+    while not (p.poll() 
+      or self.benchmarker.is_port_bound(self.port)
+      or time_remaining.total_seconds() < 0):
+      
+      # The conditions above are slow to check, so 
+      # we will delay output substantially if we only
+      # print one line per condition check. 
+      # Adding a tight loop here mitigates the effect, 
+      # ensuring that most of the output directly from 
+      # setup.sh is sent to tee_output before the outer
+      # loop exits and prints things like "setup.sh exited"
+      # 
+      for i in xrange(10):
+        try:
+          line = nbsr.readline(0.05)
+          if line:
+            tee_output(prefix, line)
+        except setup_util.EndOfStream:
+          tee_output(prefix, "Setup has terminated\n")
+          break
+      time_remaining = timeout - datetime.now()
+
+    # Did we time out?
+    if time_remaining.total_seconds() < 0: 
+      tee_output(prefix, "%s.sh timed out!! Aborting...\n" % self.setup_file)
+      p.kill()
+      return 1
+
+    # What's our return code? 
+    # If setup.sh has terminated, use that code
+    # Otherwise, detect if the port was bound
+    retcode = (p.poll() or 0 if self.benchmarker.is_port_bound(self.port) else 1)
+    if p.poll():
+      tee_output(prefix, "%s.sh process exited with %s\n" % (self.setup_file, p.poll()))
+    elif self.benchmarker.is_port_bound(self.port):
+      tee_output(prefix, "%s.sh exited due to bound port\n" % self.setup_file)
+
+    # Before we return control to the benchmarker, spin up a 
+    # thread to keep an eye on the pipes in case the running 
+    # framework uses stdout/stderr. Once all processes accessing
+    # the subprocess.PIPEs are dead, this thread will terminate. 
+    # Use a different prefix to indicate this is the framework 
+    # speaking
+    prefix = "Server %s: " % self.name
+    def watch_child_pipes(nbsr, prefix):
+      while True:
+        try:
+          line = nbsr.readline(60)
+          if line:
+            tee_output(prefix, line)
+        except setup_util.EndOfStream:
+          tee_output(prefix, "Framework processes have terminated\n")
+          return
+
+    watch_thread = Thread(target = watch_child_pipes,
+      args = (nbsr, prefix))
+    watch_thread.daemon = True
+    watch_thread.start()
+
+    logging.info("Executed %s.sh, returning %s", self.setup_file, retcode)
     os.chdir(previousDir)
     os.chdir(previousDir)
 
 
-    # Stop the progress printer
-    stopFlag.set()
-
-    logging.info("Executed %s.sh", self.setup_file)
-
     return retcode
     return retcode
   ############################################################
   ############################################################
   # End start
   # End start

+ 1 - 69
toolset/run-ci.py

@@ -526,75 +526,7 @@ if __name__ == "__main__":
     log.critical("Unknown error")
     log.critical("Unknown error")
     print traceback.format_exc()
     print traceback.format_exc()
     retcode = 1
     retcode = 1
-  finally:  # Ensure that logs are printed
-    
-    # Only print logs if we ran a verify
-    if mode != 'verify':
-      sys.exit(retcode)
-
-    # Only print logs if we actually did something
-    if os.path.isfile('.run-ci.should_not_run'):
-      sys.exit(retcode)
-
-    log.error("Running inside Travis-CI, so I will print err and out to console...")
-    
-    for name in runner.names:
-      log.error("Test %s", name)
-      try:
-        log.error("Here is ERR:")
-        with open("results/ec2/latest/logs/%s/err.txt" % name, 'r') as err:
-          for line in err:
-            log.info(line.rstrip('\n'))
-      except IOError:
-        log.error("No ERR file found")
-
-      try:
-        log.error("Here is OUT:")
-        with open("results/ec2/latest/logs/%s/out.txt" % name, 'r') as out:
-          for line in out:
-            log.info(line.rstrip('\n'))
-      except IOError:
-        log.error("No OUT file found")
-
-    log.error("Running inside Travis-CI, so I will print a copy of the verification summary")
-
-    results = None
-    try:
-      with open('results/ec2/latest/results.json', 'r') as f:
-        results = json.load(f)
-    except IOError:
-      log.critical("No results.json found, unable to print verification summary") 
-      sys.exit(retcode)
-
-    target_dir = setup_util.get_fwroot() + '/frameworks/' + testdir
-    dirtests = [t for t in gather_tests() if t.directory == target_dir]
-
-    # Normally you don't have to use Fore.* before each line, but 
-    # Travis-CI seems to reset color codes on newline (see travis-ci/travis-ci#2692)
-    # or stream flush, so we have to ensure that the color code is printed repeatedly
-    prefix = Fore.CYAN
-    for line in header("Verification Summary", top='=', bottom='').split('\n'):
-      print prefix + line
-
-    for test in dirtests:
-      print prefix + "| Test: %s" % test.name
-      if test.name not in runner.names:
-        print prefix + "|      " + Fore.YELLOW + "Unable to verify in Travis-CI"
-      elif test.name in results['verify'].keys():
-        for test_type, result in results['verify'][test.name].iteritems():
-          if result.upper() == "PASS":
-            color = Fore.GREEN
-          elif result.upper() == "WARN":
-            color = Fore.YELLOW
-          else:
-            color = Fore.RED
-          print prefix + "|       " + test_type.ljust(11) + ' : ' + color + result.upper()
-      else:
-        print prefix + "|      " + Fore.RED + "NO RESULTS (Did framework launch?)"
-    print prefix + header('', top='', bottom='=') + Style.RESET_ALL
-
-
+  finally:
     sys.exit(retcode)
     sys.exit(retcode)
 
 
-
 # vim: set sw=2 ts=2 expandtab
 # vim: set sw=2 ts=2 expandtab

+ 34 - 44
toolset/setup/linux/installer.py

@@ -107,22 +107,38 @@ class Installer:
       previousDir = os.getcwd()
       previousDir = os.getcwd()
       os.chdir(test_dir)
       os.chdir(test_dir)
 
 
-      # Load benchmark_profile file
-      profile="$FWROOT/config/benchmark_profile"
-      setup_util.replace_environ(config=profile, 
+      # Load environment
+      setup_util.replace_environ(config='$FWROOT/config/benchmark_profile', 
         command='export TROOT=%s && export IROOT=%s' %
         command='export TROOT=%s && export IROOT=%s' %
         (test_dir, test_install_dir))
         (test_dir, test_install_dir))
 
 
-      # Run test installation script
-      #   FWROOT - Path of the FwBm root
-      #   IROOT  - Path of this test's install directory
+      # Run the install.sh script for the test as the "testrunner" user
+      # 
+      # `sudo` - Switching user requires superuser privs
+      #   -u [username] The username
+      #   -E Preserves the current environment variables
+      #   -H Forces the home var (~) to be reset to the user specified
       #   TROOT  - Path to this test's directory 
       #   TROOT  - Path to this test's directory 
-      # Note: Cannot use ''' for newlines here or the script
-      # passed to `bash -c` will fail.
-      self.__run_command('sudo -u %s -E -H bash -c "export TROOT=%s && export IROOT=%s && source %s && source %s"' % 
-        (self.benchmarker.runner_user, test_dir, test_install_dir, 
-          bash_functions_path, test_install_file),
-          cwd=test_install_dir)
+      #   IROOT  - Path of this test's install directory
+      # TODO export bash functions and call install.sh directly
+      command = 'sudo -u %s -E -H bash -c "source %s && source %s"' % (
+        self.benchmarker.runner_user, 
+        bash_functions_path, 
+        test_install_file)
+
+      debug_command = '''\
+        export FWROOT=%s && \\
+        export TROOT=%s && \\
+        export IROOT=%s && \\
+        cd $IROOT && \\
+        %s''' % (self.fwroot, 
+          test_dir, 
+          test_install_dir,
+          command)
+      logging.info("To run installation manually, copy/paste this:\n%s", debug_command)
+
+      # Run test installation script
+      self.__run_command(command, cwd=test_install_dir)
 
 
       # Move back to previous directory
       # Move back to previous directory
       os.chdir(previousDir)
       os.chdir(previousDir)
@@ -148,47 +164,21 @@ class Installer:
   ############################################################
   ############################################################
   # __run_command
   # __run_command
   ############################################################
   ############################################################
-  def __run_command(self, command, send_yes=False, cwd=None, retry=False):
+  def __run_command(self, command, send_yes=False, cwd=None):
     if cwd is None: 
     if cwd is None: 
         cwd = self.install_dir
         cwd = self.install_dir
 
 
-    if retry:
-      max_attempts = 5
-    else:
-      max_attempts = 1
-    attempt = 1
-    delay = 0
     if send_yes:
     if send_yes:
       command = "yes yes | " + command
       command = "yes yes | " + command
         
         
     rel_cwd = setup_util.path_relative_to_root(cwd)
     rel_cwd = setup_util.path_relative_to_root(cwd)
     print("INSTALL: %s (cwd=$FWROOT/%s)" % (command, rel_cwd))
     print("INSTALL: %s (cwd=$FWROOT/%s)" % (command, rel_cwd))
 
 
-    while attempt <= max_attempts:
-      error_message = ""
-      try:
-
-        # Execute command.
-        subprocess.check_call(command, shell=True, cwd=cwd, executable='/bin/bash')
-        break  # Exit loop if successful.
-      except:
-        exceptionType, exceptionValue, exceptionTraceBack = sys.exc_info()
-        error_message = "".join(traceback.format_exception_only(exceptionType, exceptionValue))
-
-      # Exit if there are no more attempts left.
-      attempt += 1
-      if attempt > max_attempts:
-        break
-
-      # Delay before next attempt.
-      if delay == 0:
-        delay = 5
-      else:
-        delay = delay * 2
-      print("Attempt %s/%s starting in %s seconds." % (attempt, max_attempts, delay))
-      time.sleep(delay)
-
-    if error_message:
+    try:
+      subprocess.check_call(command, shell=True, cwd=cwd, executable='/bin/bash')
+    except:
+      exceptionType, exceptionValue, exceptionTraceBack = sys.exc_info()
+      error_message = "".join(traceback.format_exception_only(exceptionType, exceptionValue))
       self.__install_error(error_message)
       self.__install_error(error_message)
   ############################################################
   ############################################################
   # End __run_command
   # End __run_command

+ 55 - 1
toolset/setup/linux/setup_util.py

@@ -1,8 +1,62 @@
 import re
 import re
 import os
 import os
+import sys
 import subprocess
 import subprocess
 import platform
 import platform
 
 
+from threading import Thread
+from Queue import Queue, Empty
+
+class NonBlockingStreamReader:
+  '''
+  Enables calling readline in a non-blocking manner with a blocking stream, 
+  such as the ones returned from subprocess.Popen
+
+  Originally written by Eyal Arubas, who granted permission to use this inside TFB
+  See http://eyalarubas.com/python-subproc-nonblock.html
+  '''
+  def __init__(self, stream, eof_message = None):
+    '''
+    stream: the stream to read from.
+            Usually a process' stdout or stderr.
+    eof_message: A message to print to stdout as soon
+      as the stream's end is reached. Useful if you
+      want to track the exact moment a stream terminates
+    '''
+
+    self._s = stream
+    self._q = Queue()
+    self._eof_message = eof_message
+    self._poisonpill = 'MAGIC_POISONPILL_STRING'
+
+    def _populateQueue(stream, queue):
+      while True:
+        line = stream.readline()
+        if line: # 'data\n' or '\n'
+          queue.put(line)
+        else:    # '' e.g. EOF
+          if self._eof_message:
+            sys.stdout.write(self._eof_message + '\n')
+          queue.put(self._poisonpill)
+          return
+
+    self._t = Thread(target = _populateQueue,
+            args = (self._s, self._q))
+    self._t.daemon = True
+    self._t.start()
+
+  def readline(self, timeout = None):
+    try:
+      line = self._q.get(block = timeout is not None,
+        timeout = timeout)
+      if line == self._poisonpill: 
+        raise EndOfStream
+      return line
+    except Empty:
+      return None
+
+class EndOfStream(Exception): pass
+
 # Replaces all text found using the regular expression to_replace with the supplied replacement.
 # Replaces all text found using the regular expression to_replace with the supplied replacement.
 def replace_text(file, to_replace, replacement):
 def replace_text(file, to_replace, replacement):
     with open(file, "r") as conf:
     with open(file, "r") as conf:
@@ -29,7 +83,7 @@ def replace_environ(config=None, root=None, print_result=False, command='true'):
     
     
         # Clean up our current environment, preserving some important items
         # Clean up our current environment, preserving some important items
         mini_environ = {}
         mini_environ = {}
-        for envname in ['HOME', 'PATH', 'USER', 'LD_LIBRARY_PATH', 'PYTHONPATH', 'FWROOT', 'TRAVIS']:
+        for envname in ['HOME', 'PATH', 'LANG', 'USER', 'LD_LIBRARY_PATH', 'PYTHONPATH', 'FWROOT', 'TRAVIS']:
           if envname in os.environ:
           if envname in os.environ:
             mini_environ[envname] = os.environ[envname]
             mini_environ[envname] = os.environ[envname]
         for key in os.environ:
         for key in os.environ: