Browse Source

Merge pull request #1145 from hamiltont/hotfix-1

Toolset: Fix bugs in master
Hamilton Turner 10 years ago
parent
commit
f187ec6975

+ 61 - 26
toolset/benchmark/benchmarker.py

@@ -18,6 +18,7 @@ import sys
 import logging
 import socket
 import threading
+import textwrap
 from pprint import pprint
 
 from multiprocessing import Process
@@ -536,10 +537,16 @@ class Benchmarker:
           time.sleep(10)
 
         if self.__is_port_bound(test.port):
-          self.__write_intermediate_results(test.name, "port " + str(test.port) + " is not available before start")
-          err.write(header("Error: Port %s is not available, cannot start %s" % (test.port, test.name)))
+          err.write(header("Error: Port %s is not available, attempting to recover" % test.port))
           err.flush()
-          return exit_with_code(1)
+          print "Error: Port %s is not available, attempting to recover" % test.port
+          self.__forciblyEndPortBoundProcesses(test.port, out, err)
+          if self.__is_port_bound(test.port):
+            self.__write_intermediate_results(test.name, "port " + str(test.port) + " is not available before start")
+            err.write(header("Error: Port %s is not available, cannot start %s" % (test.port, test.name)))
+            err.flush()
+            print "Error: Unable to recover port, cannot start test"
+            return exit_with_code(1)
 
         result = test.start(out, err)
         if result != 0: 
@@ -681,41 +688,67 @@ class Benchmarker:
 
   def __forciblyEndPortBoundProcesses(self, test_port, out, err):
     p = subprocess.Popen(['sudo', 'netstat', '-lnp'], stdout=subprocess.PIPE)
-    out, err = p.communicate()
-    for line in out.splitlines():
-      if 'tcp' in line:
+    (ns_out, ns_err) = p.communicate()
+    for line in ns_out.splitlines():
+      # Handles tcp, tcp6, udp, udp6
+      if line.startswith('tcp') or line.startswith('udp'):
         splitline = line.split()
-        port = splitline[3].split(':')
-        port = int(port[len(port) - 1].strip())
+        port = int(splitline[3].split(':')[-1])
+        pid  = splitline[-1].split('/')[0]
+
+        # Sometimes the last column is just a dash
+        if pid == '-':
+          continue
+
         if port > 6000:
+          ps = subprocess.Popen(['ps','p',pid], stdout=subprocess.PIPE)
+          (out_6000, err_6000) = ps.communicate()
           err.write(textwrap.dedent(
-        """
-        A port that shouldn't be open is open. See the following line for netstat output.
-        {splitline}
-        """.format(splitline=splitline)))
+          """
+          Port {port} should not be open. See the following lines for information
+          {netstat}
+          {ps}
+          """.format(port=port, netstat=line, ps=out_6000)))
           err.flush()
+
         if port == test_port:
+          err.write( header("Error: Test port %s should not be open" % port, bottom='') )
           try:
-            pid = splitline[6].split('/')[0].strip()
             ps = subprocess.Popen(['ps','p',pid], stdout=subprocess.PIPE)
             # Store some info about this process
-            proc = ps.communicate()
-            os.kill(int(pid), 15)
+            (out_15, err_15) = ps.communicate()
+            children = subprocess.Popen(['ps','--ppid',pid,'-o','ppid'], stdout=subprocess.PIPE)
+            (out_children, err_children) = children.communicate()
+
+            err.write("  Sending SIGTERM to this process:\n  %s\n" % out_15)
+            err.write("  Also expecting these child processes to die:\n  %s\n" % out_children)
+
+            subprocess.check_output(['sudo','kill',pid])
             # Sleep for 10 sec; kill can be finicky
             time.sleep(10)
+
             # Check that PID again
             ps = subprocess.Popen(['ps','p',pid], stdout=subprocess.PIPE)
-            dead = ps.communicate()
-            if dead in proc:
-              os.kill(int(pid), 9)
-          except OSError:
-            out.write( textwrap.dedent("""
-              -----------------------------------------------------
-                Error: Could not kill pid {pid}
-              -----------------------------------------------------
-              """.format(pid=str(pid))) )
-            # This is okay; likely we killed a parent that ended
-            # up automatically killing this before we could.
+            (out_9, err_9) = ps.communicate()
+            if len(out_9.splitlines()) != 1:  # One line for the header row
+              err.write("  Process is still alive!\n")
+              err.write("  Sending SIGKILL to this process:\n   %s\n" % out_9)
+              subprocess.check_output(['sudo','kill','-9', pid])
+            else:
+              err.write("  Process has been terminated\n")
+
+            # Ensure all children are dead
+            c_pids = [c_pid.strip() for c_pid in out_children.splitlines()[1:]]
+            for c_pid in c_pids:
+              ps = subprocess.Popen(['ps','p',c_pid], stdout=subprocess.PIPE)
+              (out_9, err_9) = ps.communicate()
+              if len(out_9.splitlines()) != 1:  # One line for the header row
+                err.write("  Child Process %s is still alive, sending SIGKILL\n" % c_pid)
+                subprocess.check_output(['sudo','kill','-9', pid])
+          except Exception as e: 
+            err.write( "  Error: Unknown exception %s\n" % e )
+          err.write( header("Done attempting to recover port %s" % port, top='') )
+
 
   ############################################################
   # __parse_results
@@ -905,6 +938,8 @@ class Benchmarker:
         args['types'] = { args['type'] : types[args['type']] }
     del args['type']
 
+    args['max_threads'] = args['threads']
+
     self.__dict__.update(args)
     # pprint(self.__dict__)
 

+ 16 - 13
toolset/benchmark/framework_test.py

@@ -297,7 +297,9 @@ class FrameworkTest:
           pass
 
       if test.passed:
-        if test.requires_db:
+        if test_type == 'plaintext': # One special case
+          remote_script = self.__generate_concurrency_script(test.get_url(), self.port, test.accept_header, levels=[256,1024,4096,16384], pipeline="16")
+        elif test.requires_db:
           remote_script = self.__generate_query_script(test.get_url(), self.port, test.accept_header)
         else:
           remote_script = self.__generate_concurrency_script(test.get_url(), self.port, test.accept_header)
@@ -306,9 +308,10 @@ class FrameworkTest:
         self.__begin_logging(test_type)
         
         # Run the benchmark 
-        p = subprocess.Popen(self.benchmarker.client_ssh_string.split(" "), stdin=subprocess.PIPE, stdout=output_file, stderr=err)
-        p.communicate(remote_script)
-        err.flush()
+        with open(output_file, 'w') as raw_file:
+          p = subprocess.Popen(self.benchmarker.client_ssh_string.split(" "), stdin=subprocess.PIPE, stdout=raw_file, stderr=err)
+          p.communicate(remote_script)
+          err.flush()
 
         # End resource usage metrics collection
         self.__end_logging()
@@ -454,13 +457,13 @@ class FrameworkTest:
   # specifically works for the variable concurrency tests (JSON
   # and DB)
   ############################################################
-  def __generate_concurrency_script(self, url, port, accept_header, wrk_command="wrk", intervals=[], pipeline=""):
-    if len(intervals) == 0:
-      intervals = self.benchmarker.concurrency_levels
+  def __generate_concurrency_script(self, url, port, accept_header, wrk_command="wrk", levels=[], pipeline=""):
+    if len(levels) == 0:
+      levels = self.benchmarker.concurrency_levels
     headers = self.headers_template.format(accept=accept_header)
-    return self.concurrency_template.format(max_concurrency=self.benchmarker.max_concurrency, 
-      max_threads=self.benchmarker.max_threads, name=self.name, duration=self.benchmarker.duration, 
-      interval=" ".join("{}".format(item) for item in intervals), 
+    return self.concurrency_template.format(max_concurrency=max(self.benchmarker.concurrency_levels), 
+      max_threads=self.benchmarker.threads, name=self.name, duration=self.benchmarker.duration, 
+      levels=" ".join("{}".format(item) for item in levels), 
       server_host=self.benchmarker.server_host, port=port, url=url, headers=headers, wrk=wrk_command,
       pipeline=pipeline)
 
@@ -472,9 +475,9 @@ class FrameworkTest:
   ############################################################
   def __generate_query_script(self, url, port, accept_header):
     headers = self.headers_template.format(accept=accept_header)
-    return self.query_template.format(max_concurrency=self.benchmarker.max_concurrency, 
-      max_threads=self.benchmarker.max_threads, name=self.name, duration=self.benchmarker.duration, 
-      interval=" ".join("{}".format(item) for item in self.benchmarker.query_intervals), 
+    return self.query_template.format(max_concurrency=max(self.benchmarker.concurrency_levels), 
+      max_threads=self.benchmarker.threads, name=self.name, duration=self.benchmarker.duration, 
+      levels=" ".join("{}".format(item) for item in self.benchmarker.query_levels), 
       server_host=self.benchmarker.server_host, port=port, url=url, headers=headers)
 
   ############################################################

+ 5 - 1
toolset/benchmark/test_types/db_type.py

@@ -73,7 +73,11 @@ class DBTestType(FrameworkTestType):
     if "id" not in db_object:
       problems.append( ('fail', "Response has no 'id' key", url) ) 
     if "randomnumber" not in db_object:
-      problems.append( ('fail', "Response has no 'randomNumber' key", url) ) 
+      problems.append( ('fail', "Response has no 'randomNumber' key", url) )
+    
+    # Ensure we can continue on to use these keys
+    if "id" not in db_object or "randomnumber" not in db_object:
+      return problems
 
     try:
       float(db_object["id"])

+ 4 - 3
toolset/benchmark/test_types/framework_test_type.py

@@ -64,17 +64,18 @@ class FrameworkTestType:
     # Don't use -f so that the HTTP response code is ignored.
     # Use -sS to hide progress bar, but show errors.
     print "Accessing URL %s" % url
+    self.err.write("Accessing URL %s \n" % url)
+    self.out.write("Accessing URL %s \n" % url)
     p = subprocess.Popen(["curl", "-m", "15", "-i", "-sS", url], stderr=PIPE, stdout=PIPE)
     (out, err) = p.communicate()
     self.err.write(err+'\n')
-    self.out.write(out+'\n')
+    self.out.write("Response: \n\"" + out+ "\"\n")
     if p.returncode != 0:
       return None
     # Get response body
     p = subprocess.Popen(["curl", "-m", "15", "-s", url], stdout=PIPE, stderr=PIPE)
     (out, err) = p.communicate()
-    self.err.write(err+'\n')
-    self.out.write(out+'\n')
+    print "  Response (trimmed to 40 bytes): \"%s\"" % out[:40]
     return out
   
   def verify(self, base_url):

+ 2 - 2
toolset/benchmark/test_types/plaintext_type.py

@@ -23,8 +23,8 @@ class PlaintextTestType(FrameworkTestType):
       return [('fail', "Could not find 'Hello, World!' in response", url)]
 
     if len("hello, world!") < len(body):
-      return [('warn', '''Server is returning more data than is required.
-        This may negatively affect benchmark performance''', url)]
+      return [('warn', """Server is returning %s more bytes than are required.
+        This may negatively affect benchmark performance""" % (len(body) - len("hello, world!")), url)]
 
     return [('pass', '', url)]
 

+ 9 - 2
toolset/benchmark/test_types/query_type.py

@@ -30,11 +30,18 @@ class QueryTestType(DBTestType):
     body = self._curl(url + '0')
     problems += self._verifyQueryList(1, body, url + '0', 'warn')
 
+    # Note: A number of tests fail here because they only parse for 
+    # a number and crash on 'foo'. For now we only warn about this
     body = self._curl(url + 'foo')
-    problems += self._verifyQueryList(1, body, url + 'foo')
+    if body is None:
+      problems += [('warn','No response (this will be a failure in future rounds, please fix)', url)]
+    elif len(body) == 0:
+      problems += [('warn','Empty response (this will be a failure in future rounds, please fix)', url)]
+    else:
+      problems += self._verifyQueryList(1, body, url + 'foo', 'warn')
 
     body = self._curl(url + '501')
-    problems += self._verifyQueryList(500, body, url + '501')
+    problems += self._verifyQueryList(500, body, url + '501', 'warn')
 
     if len(problems) == 0:
       return [('pass','',url + '2'),

+ 6 - 6
toolset/run-tests.py

@@ -180,18 +180,18 @@ def main(argv=None):
         print 'Configuration options: '
         pprint(vars(args))
 
-
-
     benchmarker = Benchmarker(vars(args))
 
     # Run the benchmarker in the specified mode
-    if benchmarker.list_tests:
+    #   Do not use benchmarker variables for these checks, 
+    #   they are either str or bool based on the python version
+    if args.list_tests:
       benchmarker.run_list_tests()
-    elif benchmarker.list_test_metadata:
+    elif args.list_test_metadata:
       benchmarker.run_list_test_metadata()
-    elif benchmarker.parse != None:
+    elif args.parse != None:
       benchmarker.parse_timestamp()
-    elif not benchmarker.install_only:
+    elif not args.install_only:
       return benchmarker.run()
 
 if __name__ == "__main__":