Browse Source

Merge branch 'refactor_setup' into travis

Hamilton Turner 11 years ago
parent
commit
07fcec538c
5 changed files with 133 additions and 57 deletions
  1. 6 14
      go/setup.py
  2. 12 4
      toolset/benchmark/benchmarker.py
  3. 9 5
      toolset/benchmark/test_runner.py
  4. 105 33
      toolset/benchmark/utils.py
  5. 1 1
      toolset/run-tests.py

+ 6 - 14
go/setup.py

@@ -4,7 +4,7 @@ import os
 import setup_util
 from test_runner import TestRunner
 
-class Bar(TestRunner):
+class Go(TestRunner):
 
   def start(self):
     setup_util.replace_text("go/src/hello/hello.go", "tcp\(.*:3306\)", "tcp(" + self.database_host + ":3306)")
@@ -12,16 +12,12 @@ class Bar(TestRunner):
       #subprocess.call("rmdir /s /q pkg\\windows_amd64", shell=True, cwd="go")
       #subprocess.call("rmdir /s /q src\\github.com", shell=True, cwd="go")
       #subprocess.call("del /s /q /f bin\\hello.exe", shell=True, cwd="go")
-      subprocess.call("set GOPATH=C:\\FrameworkBenchmarks\\go&& go get ./...", shell=True, cwd="go", stderr=errfile, stdout=logfile)
-      subprocess.Popen("setup.bat", shell=True, cwd="go", stderr=errfile, stdout=logfile)
+      self.sh("go get ./...")
+      self.sh("setup.bat")
       return 0
     
-    self.sh("which go")
-    self.sh("rm -rf src/github.com")
-    self.sh("ls src/github.com/go-sql-driver/mysql")
     self.sh("go get ./...")
-    self.sh("ls src/github.com/go-sql-driver/mysql")
-    self.sh_async("go run -x -v src/hello/hello.go")
+    self.pid = self.sh_async("go run -x -v src/hello/hello.go")
     return 0
 
   def stop(self):
@@ -29,10 +25,6 @@ class Bar(TestRunner):
       subprocess.call("taskkill /f /im go.exe > NUL", shell=True, stderr=errfile, stdout=logfile)
       subprocess.call("taskkill /f /im hello.exe > NUL", shell=True, stderr=errfile, stdout=logfile)
       return 0
-    p = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)
-    out, err = p.communicate()
-    for line in out.splitlines():
-      if 'hello' in line:
-        pid = int(line.split(None, 2)[1])
-        os.kill(pid, 15)
+    # Kill off the entire go process group
+    self.sh_pkill(self.pid)
     return 0

+ 12 - 4
toolset/benchmark/benchmarker.py

@@ -488,10 +488,12 @@ class Benchmarker:
     log.info("__run_tests")
     log.debug("__run_tests with __name__ = %s",__name__)
 
+    error_happened = False
     if self.os.lower() == 'windows':
       log.info("Executing __run_tests on Windows")
       for test in tests:
-        self.__run_test(test)
+        if self.__run_test(test) != 0:
+          error_happened = True
     else:
       log.info("Executing __run_tests on Linux")
       for test in tests:
@@ -534,7 +536,12 @@ class Benchmarker:
               pass
             subprocess.call("sudo pkill -SIGKILL -s %s" % test_process.pid, shell=True)
           log.handlers = []  # Clean up handlers left by __run_test
+          if test_process.exitcode != 0:
+            error_happened = True
     log.info("End __run_tests")
+    if error_happened:
+      return 1
+    return 0
 
   ############################################################
   # End __run_tests
@@ -645,7 +652,7 @@ class Benchmarker:
       if self.__is_port_bound(test.port):
         self.__write_intermediate_results(test.name, "port %s is not available before start" % test.port)
         log.error(Header("Error: Port %s is not available, cannot start %s" % (test.port, test.name)))
-        return
+        return 1
 
       result = test.start(log)
       if result != 0: 
@@ -654,7 +661,7 @@ class Benchmarker:
         log.error("ERROR: Problem starting %s", test.name)
         log.error(Header("Stopped %s" % test.name))
         self.__write_intermediate_results(test.name,"<setup.py>#start() returned non-zero")
-        return
+        return 1
       
       log.info("Sleeping for %s", self.sleep)
       time.sleep(self.sleep)
@@ -676,7 +683,7 @@ class Benchmarker:
       if self.__is_port_bound(test.port):
         self.__write_intermediate_results(test.name, "port %s was not released by stop" % test.port)
         log.error(Header("Error: Port %s was not released by stop %s" % (test.port, test.name)))
-        return
+        return 1
 
       log.info(Header("Stopped %s" % test.name))
       time.sleep(5)
@@ -701,6 +708,7 @@ class Benchmarker:
         log.error("%s", e)
         log.error("%s", sys.exc_info()[:2])
         log.debug("Subprocess Error Details", exc_info=True)
+      return 1
   ############################################################
   # End __run_test
   ############################################################

+ 9 - 5
toolset/benchmark/test_runner.py

@@ -18,7 +18,7 @@ class TestRunner:
     self.dir = test.directory
     
     (out, err) = WrapLogger(logger, logging.INFO), WrapLogger(logger, logging.ERROR)
-    self.utils = ShellUtils(self.dir, out, err)
+    self.utils = ShellUtils(self.dir, None, None, logger)
 
   def start(self):
     raise NotImplementedError()
@@ -26,11 +26,15 @@ class TestRunner:
   def stop(self):
     raise NotImplementedError()
 
-  def sh(self, command, **kwargs):
-    self.utils.sh(command, **kwargs)
+  def sh(self, *args, **kwargs):
+    self.utils.sh(*args, **kwargs)
 
-  def sh_async(self, command, **kwargs):
-    self.utils.sh_async(command, **kwargs)
+  def sh_async(self, *args, **kwargs):
+    return self.utils.sh_async(*args, **kwargs)
+
+  def sh_pkill(self, *args, **kwargs):
+    self.logger.debug("Got %s and %s" % (args, kwargs))
+    self.utils.sh_pkill(*args, **kwargs)
 
   @staticmethod
   def is_parent_of(target_class):

+ 105 - 33
toolset/benchmark/utils.py

@@ -1,13 +1,19 @@
 
 import subprocess
 import time
+import os
+import logging
+import shlex
+
 class ShellUtils():
-  def __init__(self, directory, outfile, errfile):
+  def __init__(self, directory, outfile, errfile, logger=None):
     '''
     outfile: A file-like object to write command output to. 
              Must have write(string) method. Common choices are 
              files, sys.stdout, or WrapLogger objects
     errfile: See outfile 
+    logger : If provided, used instead of outfile/errfile for 
+             finer-grained logging
     '''
     # Advanced notes: outfile and errfile do *not* have to be 
     # thread-safe objects. They are only ever written to from one 
@@ -16,53 +22,76 @@ class ShellUtils():
     self.directory = directory
     self.outfile = outfile
     self.errfile = errfile
+    self.logger = logger
+    self.os = os.name
+  
+  def __write_out(self, message, level=logging.INFO, stream=None):
+    if self.logger:
+      self.logger.log(level, message)
+    elif stream == None:
+      self.outfile.write(message)
+    else:
+      stream.write(message)
+  
+  def __write_err(self, message, level=logging.ERROR):
+    self.__write_out(message, level, stream=self.errfile)
   
   def sh(self, command, **kwargs):
     '''Run a shell command, sending output to outfile and errfile.
     Blocks until command exits'''
     kwargs.setdefault('cwd', self.directory)
     kwargs.setdefault('executable', '/bin/bash')
-    self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
+    self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
     try:
       output = subprocess.check_output(command, shell=True, stderr=self.errfile, **kwargs)
       if output and output.strip():
-        self.outfile.write("Output:")
-        self.outfile.write(output.rstrip('\n'))
+        self.__write_out("Output:")
+        self.__write_out(output.rstrip('\n'))
       else:
-        self.outfile.write("No Output")
+        self.__write_out("No Output")
     except subprocess.CalledProcessError:
-      self.errfile.write("Command returned non-zero exit code: %s" % command)
+      self.__write_err("Command returned non-zero exit code: %s" % command)
 
-  # TODO modify this to start the subcommand as a new process group, so that 
-  # we can automatically kill the entire group!
-  def sh_async(self, command, initial_logs=True, **kwargs):
-    '''Run a shell command, sending output to outfile and errfile.
-    If intial_logs, prints out logs for a few seconds before returning. '''
-    # TODO add this - '''Continues to send output until command completes'''
+  def sh_async(self, command, group=True, **kwargs):
+    '''
+    Run a shell command, continually sending output to outfile and errfile until 
+    shell process completes
+
+    - If group is set, create a process group that can later be used to kill all subprocesses
+    Returns the pid of the newly created process (or process group)
+    '''
+    
+    # Setup extra args
     kwargs.setdefault('cwd', self.directory)
-    self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
+    if group:
+      if self.os != 'nt':
+        kwargs.setdefault('preexec_fn', os.setpgrp)
+      else:
+        # TODO if someone could make this work that would be great
+        self.__write_err("Unable to group flag on Windows")
     
-    # Open in line-buffered mode (bufsize=1) because NonBlockingStreamReader uses readline anyway
+    self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
+    # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline
     process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
-    nbsr = NonBlockingStreamReader(process.stdout)
-    nbsr_err = NonBlockingStreamReader(process.stderr) 
-    if initial_logs:
-      time.sleep(8)
-      # TODO put this read into a tight loop to prevent deadlock due to 
-      # filling up OS buffers
-      out = nbsr.read()
-      if len(out) == 0:
-        self.outfile.write("No output")
-      else: 
-        self.outfile.write("Initial Output:")
-        for line in out:
-            self.outfile.write(line.rstrip('\n'))
-        
-      err = nbsr_err.read()
-      if len(err) != 0:
-        self.errfile.write("Initial Error Logs:")
-        for line in err:
-          self.errfile.write(line.rstrip('\n'))
+    NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0])
+    return process.pid
+
+  def sh_pkill(self, group_id, name=None, usesudo=False):
+    '''
+    Kill processes that match all the passed arguments
+    Set group_id if you used sh_async
+    Set usesudo only if you started these processes with sudo
+    # TODO - consider os.pgkill?
+    '''
+    command = "pkill "
+    command = "%s -g %s" % (command, group_id)
+
+    if name:
+      command = "%s %s" % (command, name)
+    if usesudo:
+      command = "sudo %s" % command
+    self.sh(command)
+
 
 from threading import Thread
 from Queue import Queue, Empty
@@ -104,6 +133,49 @@ class NonBlockingStreamReader:
         return lines
       lines.append(line)
 
+import logging
+from threading import Thread
+import threading
+# NonBlockingStreamLogger(p, logging.getLogger())
+# p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
+class NonBlockingStreamLogger:
+  '''Reads from a subprocess' streams and writes them to the 
+  provided logger as long as the subprocess is alive'''
+  def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None):
+    self.process = process
+    self.logger = logger
+    self.out_level = logout
+    self.err_level = logerr
+    if name:
+      self.prefix = "Process '%s':" % name
+    else: 
+      self.logger.warning("No name provided for process %s", process.pid)
+      self.prefix = "Process '%s':" % process.pid
+      name = process.pid
+    outThread = Thread(target = self._readStream,
+                args = (process.stdout, self.out_level),
+                name = "%s - stdout" % name)
+    outThread.daemon = True
+    outThread.start()
+    errThread = Thread(target = self._readStream,
+                args = (process.stderr, self.err_level),
+                name = "%s - stderr" % name)
+    errThread.daemon = True
+    errThread.start()
+  def _readStream(self, stream, level):
+    self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name)
+    for line in iter(stream.readline, b''):
+      self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n'))
+      
+      # Has process died? 
+      if self.process.poll() == 0:
+        self.logger.debug("%s Death. Reading remainder of stream", self.prefix)
+        remainder = stream.read()
+        for line2 in remainder.split(b'\n'):
+          self.logger.log(level, "%s %s", self.prefix, line2)
+        break
+    self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name)
+    return 0
 import tempfile
 class WrapLogger():
   """

+ 1 - 1
toolset/run-tests.py

@@ -172,7 +172,7 @@ def main(argv=None):
     elif benchmarker.parse != None:
       benchmarker.parse_timestamp()
     else:
-      benchmarker.run()
+      return benchmarker.run()
 
 # Integrate uncaught exceptions into our logging system
 # Note: This doesn't work if the exception happens in a