Browse Source

Add stats and benchmark resume to toolset

Alex Schneider 11 years ago
parent
commit
855f284037

+ 44 - 0
toolset/benchmark/benchmarker.py

@@ -240,6 +240,35 @@ class Benchmarker:
   # End warning_file
   # End warning_file
   ############################################################
   ############################################################
 
 
+  ############################################################
+  # get_stats_file(test_name, test_type)
+  # returns the stats file name for this test_name and 
+  # test_type timestamp/test_type/test_name/raw 
+  ############################################################
+  def get_stats_file(self, test_name, test_type):
+    return os.path.join(self.result_directory, self.timestamp, test_type, test_name, "stats")
+  ############################################################
+  # End get_stats_file
+  ############################################################
+
+
+  ############################################################
+  # stats_file(test_name, test_type)
+  # returns the stats file for this test_name and test_type
+  # timestamp/test_type/test_name/raw 
+  ############################################################
+  def stats_file(self, test_name, test_type):
+      path = self.get_stats_file(test_name, test_type)
+      try:
+        os.makedirs(os.path.dirname(path))
+      except OSError:
+        pass
+      return path
+  ############################################################
+  # End stats_file
+  ############################################################
+  
+
   ############################################################
   ############################################################
   # full_results_directory
   # full_results_directory
   ############################################################
   ############################################################
@@ -349,6 +378,16 @@ class Benchmarker:
             tests.append(atest)
             tests.append(atest)
 
 
     tests.sort(key=lambda x: x.name)
     tests.sort(key=lambda x: x.name)
+
+    # If the tests have been interrupted somehow, then we want to resume them where we left
+    # off, rather than starting from the beginning
+    if os.path.isfile('current_benchmark.txt'):
+        with open('current_benchmark.txt', 'r') as interrputed_benchmark:
+            interrupt_bench = interrupted_benchmark.read()
+            for index, atest in enumerate(tests):
+                if atest.name == interrupt_bench:
+                    tests = tests[index:]
+                    break
     return tests
     return tests
   ############################################################
   ############################################################
   # End __gather_tests
   # End __gather_tests
@@ -478,6 +517,8 @@ class Benchmarker:
     if self.os.lower() == 'windows':
     if self.os.lower() == 'windows':
       logging.debug("Executing __run_tests on Windows")
       logging.debug("Executing __run_tests on Windows")
       for test in tests:
       for test in tests:
+        with open('current_benchmark.txt', 'w') as benchmark_resume_file:
+          benchmark_resume_file.write(test.name)
         self.__run_test(test)
         self.__run_test(test)
     else:
     else:
       logging.debug("Executing __run_tests on Linux")
       logging.debug("Executing __run_tests on Linux")
@@ -489,6 +530,8 @@ class Benchmarker:
               Running Test: {name} ...
               Running Test: {name} ...
             -----------------------------------------------------
             -----------------------------------------------------
             """.format(name=test.name))
             """.format(name=test.name))
+          with open('current_benchmark.txt', 'w') as benchmark_resume_file:
+            benchmark_resume_file.write(test.name)
           test_process = Process(target=self.__run_test, args=(test,))
           test_process = Process(target=self.__run_test, args=(test,))
           test_process.start()
           test_process.start()
           test_process.join(self.run_test_timeout_seconds)
           test_process.join(self.run_test_timeout_seconds)
@@ -497,6 +540,7 @@ class Benchmarker:
             logging.debug("Child process for {name} is still alive. Terminating.".format(name=test.name))
             logging.debug("Child process for {name} is still alive. Terminating.".format(name=test.name))
             self.__write_intermediate_results(test.name,"__run_test timeout (="+ str(self.run_test_timeout_seconds) + " seconds)")
             self.__write_intermediate_results(test.name,"__run_test timeout (="+ str(self.run_test_timeout_seconds) + " seconds)")
             test_process.terminate()
             test_process.terminate()
+    os.remove('current_benchmark.txt')
     logging.debug("End __run_tests.")
     logging.debug("End __run_tests.")
 
 
   ############################################################
   ############################################################

+ 117 - 0
toolset/benchmark/framework_test.py

@@ -12,6 +12,8 @@ import traceback
 import json
 import json
 import textwrap
 import textwrap
 import logging
 import logging
+import csv
+import shlex
 
 
 class FrameworkTest:
 class FrameworkTest:
   ##########################################################################################
   ##########################################################################################
@@ -43,6 +45,14 @@ class FrameworkTest:
     echo ""
     echo ""
     {wrk} {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}"
     {wrk} {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}"
     sleep 5
     sleep 5
+
+    echo ""
+    echo "---------------------------------------------------------"
+    echo " Synchronizing time"
+    echo "---------------------------------------------------------"
+    echo ""
+    ntpdate -s pool.ntp.org
+
     for c in {interval}
     for c in {interval}
     do
     do
       echo ""
       echo ""
@@ -51,7 +61,10 @@ class FrameworkTest:
       echo " {wrk} {headers} -d {duration} -c $c --timeout $c -t $(($c>{max_threads}?{max_threads}:$c)) \"http://{server_host}:{port}{url}\" -s ~/pipeline.lua -- {pipeline}"
       echo " {wrk} {headers} -d {duration} -c $c --timeout $c -t $(($c>{max_threads}?{max_threads}:$c)) \"http://{server_host}:{port}{url}\" -s ~/pipeline.lua -- {pipeline}"
       echo "---------------------------------------------------------"
       echo "---------------------------------------------------------"
       echo ""
       echo ""
+      STARTTIME=$(date +"%s")
       {wrk} {headers} -d {duration} -c $c --timeout $c -t "$(($c>{max_threads}?{max_threads}:$c))" http://{server_host}:{port}{url} -s ~/pipeline.lua -- {pipeline}
       {wrk} {headers} -d {duration} -c $c --timeout $c -t "$(($c>{max_threads}?{max_threads}:$c))" http://{server_host}:{port}{url} -s ~/pipeline.lua -- {pipeline}
+      echo "STARTTIME $STARTTIME"
+      echo "ENDTIME $(date +"%s")"
       sleep 2
       sleep 2
     done
     done
   """
   """
@@ -75,6 +88,14 @@ class FrameworkTest:
     echo ""
     echo ""
     wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}2"
     wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}2"
     sleep 5
     sleep 5
+
+    echo ""
+    echo "---------------------------------------------------------"
+    echo " Synchronizing time"
+    echo "---------------------------------------------------------"
+    echo ""
+    ntpdate -s pool.ntp.org
+
     for c in {interval}
     for c in {interval}
     do
     do
       echo ""
       echo ""
@@ -83,7 +104,10 @@ class FrameworkTest:
       echo " wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} \"http://{server_host}:{port}{url}$c\""
       echo " wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} \"http://{server_host}:{port}{url}$c\""
       echo "---------------------------------------------------------"
       echo "---------------------------------------------------------"
       echo ""
       echo ""
+      STARTTIME=$(date +"%s")
       wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}$c"
       wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}$c"
+      echo "STARTTIME $STARTTIME"
+      echo "ENDTIME $(date +"%s")"
       sleep 2
       sleep 2
     done
     done
   """
   """
@@ -540,8 +564,11 @@ class FrameworkTest:
             pass
             pass
         if self.json_url_passed:
         if self.json_url_passed:
           remote_script = self.__generate_concurrency_script(self.json_url, self.port, self.accept_json)
           remote_script = self.__generate_concurrency_script(self.json_url, self.port, self.accept_json)
+          self.__begin_logging(self.JSON)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.JSON)
         results = self.__parse_test(self.JSON)
+        print results
         self.benchmarker.report_results(framework=self, test=self.JSON, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.JSON, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
         out.flush()
         out.flush()
@@ -565,7 +592,9 @@ class FrameworkTest:
             pass
             pass
         if self.db_url_passed:
         if self.db_url_passed:
           remote_script = self.__generate_concurrency_script(self.db_url, self.port, self.accept_json)
           remote_script = self.__generate_concurrency_script(self.db_url, self.port, self.accept_json)
+          self.__begin_logging(self.DB)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.DB)
         results = self.__parse_test(self.DB)
         self.benchmarker.report_results(framework=self, test=self.DB, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.DB, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
@@ -589,7 +618,9 @@ class FrameworkTest:
             pass
             pass
         if self.query_url_passed:
         if self.query_url_passed:
           remote_script = self.__generate_query_script(self.query_url, self.port, self.accept_json)
           remote_script = self.__generate_query_script(self.query_url, self.port, self.accept_json)
+          self.__begin_logging(self.QUERY)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.QUERY)
         results = self.__parse_test(self.QUERY)
         self.benchmarker.report_results(framework=self, test=self.QUERY, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.QUERY, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
@@ -610,7 +641,9 @@ class FrameworkTest:
             pass
             pass
         if self.fortune_url_passed:
         if self.fortune_url_passed:
           remote_script = self.__generate_concurrency_script(self.fortune_url, self.port, self.accept_html)
           remote_script = self.__generate_concurrency_script(self.fortune_url, self.port, self.accept_html)
+          self.__begin_logging(self.FORTUNE)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.FORTUNE)
         results = self.__parse_test(self.FORTUNE)
         self.benchmarker.report_results(framework=self, test=self.FORTUNE, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.FORTUNE, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
@@ -631,7 +664,9 @@ class FrameworkTest:
             pass
             pass
         if self.update_url_passed:
         if self.update_url_passed:
           remote_script = self.__generate_query_script(self.update_url, self.port, self.accept_json)
           remote_script = self.__generate_query_script(self.update_url, self.port, self.accept_json)
+          self.__begin_logging(self.UPDATE)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.UPDATE)
         results = self.__parse_test(self.UPDATE)
         self.benchmarker.report_results(framework=self, test=self.UPDATE, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.UPDATE, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
@@ -652,7 +687,9 @@ class FrameworkTest:
             pass
             pass
         if self.plaintext_url_passed:
         if self.plaintext_url_passed:
           remote_script = self.__generate_concurrency_script(self.plaintext_url, self.port, self.accept_plaintext, wrk_command="wrk", intervals=[256,1024,4096,16384], pipeline="16")
           remote_script = self.__generate_concurrency_script(self.plaintext_url, self.port, self.accept_plaintext, wrk_command="wrk", intervals=[256,1024,4096,16384], pipeline="16")
+          self.__begin_logging(self.PLAINTEXT)
           self.__run_benchmark(remote_script, output_file, err)
           self.__run_benchmark(remote_script, output_file, err)
+          self.__end_logging()
         results = self.__parse_test(self.PLAINTEXT)
         results = self.__parse_test(self.PLAINTEXT)
         self.benchmarker.report_results(framework=self, test=self.PLAINTEXT, results=results['results'])
         self.benchmarker.report_results(framework=self, test=self.PLAINTEXT, results=results['results'])
         out.write( "Complete\n" )
         out.write( "Complete\n" )
@@ -788,6 +825,15 @@ class FrameworkTest:
                 m = re.search("Non-2xx or 3xx responses: ([0-9]+)", line)
                 m = re.search("Non-2xx or 3xx responses: ([0-9]+)", line)
                 if m != None: 
                 if m != None: 
                   rawData['5xx'] = int(m.group(1))
                   rawData['5xx'] = int(m.group(1))
+              if "STARTTIME" in line:
+                m = re.search("[0-9]+", line)
+                rawData["startTime"] = int(m.group(0))
+              if "ENDTIME" in line:
+                m = re.search("[0-9]+", line)
+                rawData["endTime"] = int(m.group(0))
+                stats = self.__parse_stats(test_type, rawData["startTime"], rawData["endTime"], 1)
+                with open(self.benchmarker.stats_file(self.name, test_type) + ".json", "w") as stats_file:
+                  json.dump(stats, stats_file)
               
               
 
 
       return results
       return results
@@ -902,6 +948,77 @@ class FrameworkTest:
               self.contains_type(self.DB) or 
               self.contains_type(self.DB) or 
               self.contains_type(self.QUERY) or
               self.contains_type(self.QUERY) or
               self.contains_type(self.UPDATE))
               self.contains_type(self.UPDATE))
+  ############################################################
+  # __begin_logging
+  # Starts a thread to monitor the resource usage, to be synced with the client's time
+  # TODO: MySQL and InnoDB are possible. Figure out how to implement them.
+  ############################################################
+  def __begin_logging(self, test_name):
+    output_file = "{file_name}".format(file_name=self.benchmarker.get_stats_file(self.name, test_name))
+    dstat_string = "dstat -afilmprsT --aio --fs --ipc --lock --raw --socket --tcp \
+                                      --raw --socket --tcp --udp --unix --vm --disk-util \
+                                      --rpc --rpcd --output {output_file}".format(output_file=output_file)
+    cmd = shlex.split(dstat_string)
+    dev_null = open(os.devnull, "w")
+    self.subprocess_handle = subprocess.Popen(cmd, stdout=dev_null)
+  ##############################################################
+  # End __begin_logging
+  ##############################################################
+
+  ##############################################################
+  # Begin __end_logging
+  # Stops the logger thread and blocks until shutdown is complete. 
+  ##############################################################
+  def __end_logging(self):
+    self.subprocess_handle.terminate()
+    self.subprocess_handle.communicate()
+  ##############################################################
+  # End __end_logging
+  ##############################################################
+
+  ##############################################################
+  # Begin __parse_stats
+  # For each test type, process all the statistics, and return a multi-layered dictionary
+  # that has a structure as follows:
+  # (timestamp)
+  # | (main header) - group that the stat is in
+  # | | (sub header) - title of the stat
+  # | | | (stat) - the stat itself, usually a floating point number
+  ##############################################################
+  def __parse_stats(self, test_type, start_time, end_time, interval):
+    stats_dict = dict()
+    stats_file = self.benchmarker.stats_file(self.name, test_type)
+    with open(stats_file) as stats:
+      while(stats.next() != "\n"):
+        pass
+      stats_reader = csv.reader(stats)
+      h1= stats_reader.next()
+      h2 = stats_reader.next()
+      time_row = h2.index("epoch")
+      int_counter = 0
+      for row in stats_reader:
+        time = float(row[time_row])
+        int_counter+=1
+        if time < start_time:
+          continue
+        elif time > end_time:
+          return stats_dict
+        if int_counter % interval != 0:
+          continue
+        row_dict = dict()
+        for nextheader in h1:
+          if nextheader != "":
+            row_dict[nextheader] = dict()
+        header = ""
+        for item_num in range(len(row)):
+          if(len(h1[item_num]) != 0):
+            header = h1[item_num]
+          row_dict[header][h2[item_num]] = row[item_num]
+        stats_dict[time] = row_dict
+    return stats_dict
+  ##############################################################
+  # End __parse_stats
+  ##############################################################
 
 
   ##########################################################################################
   ##########################################################################################
   # Constructor
   # Constructor

+ 2 - 2
toolset/setup/linux/prerequisites.sh

@@ -29,7 +29,7 @@ sudo apt-get -y install \
   libgstreamer-plugins-base0.10-0 libgstreamer0.10-0 \
   libgstreamer-plugins-base0.10-0 libgstreamer0.10-0 \
   liborc-0.4-0 libwxbase2.8-0 libwxgtk2.8-0 libgnutls-dev \
   liborc-0.4-0 libwxbase2.8-0 libwxgtk2.8-0 libgnutls-dev \
   libjson0-dev libmcrypt-dev libicu-dev gettext \
   libjson0-dev libmcrypt-dev libicu-dev gettext \
-  libpq-dev mlton cloc
+  libpq-dev mlton cloc dstat
 
 
 sudo add-apt-repository ppa:ubuntu-toolchain-r/test -y
 sudo add-apt-repository ppa:ubuntu-toolchain-r/test -y
 sudo apt-get -y update
 sudo apt-get -y update
@@ -47,4 +47,4 @@ RETCODE=$(fw_exists ~/.bash_profile.bak)
 
 
 sudo sh -c "echo '*               -    nofile          65535' >> /etc/security/limits.conf"
 sudo sh -c "echo '*               -    nofile          65535' >> /etc/security/limits.conf"
 
 
-touch fwbm_prereqs_installed
+touch fwbm_prereqs_installed