Browse Source

Fixes #927 (terminate child processes) for linux systems

Fix this by:
- Using linux process groups and session identifier to track created processes
- Automatically send SIGTERM to processes when needed
- Use SIGTERM handler in subprocess to call setup_module.stop()
- Use boolean flag to ensure stop() is only called once
- Removes checks for KeyboardInterrupt from subprocess (this has known bugs with multiprocessing)
- Removes check for system exit from subprocess, not required
Hamilton Turner 11 years ago
parent
commit
0421109f8a
1 changed files with 54 additions and 12 deletions
  1. 54 12
      toolset/benchmark/benchmarker.py

+ 54 - 12
toolset/benchmark/benchmarker.py

@@ -16,6 +16,7 @@ import logging
 log = logging.getLogger('benchmarker')
 log = logging.getLogger('benchmarker')
 import socket
 import socket
 import glob
 import glob
+import signal
 from multiprocessing import Process
 from multiprocessing import Process
 from datetime import datetime
 from datetime import datetime
 
 
@@ -493,18 +494,45 @@ class Benchmarker:
         self.__run_test(test)
         self.__run_test(test)
     else:
     else:
       log.info("Executing __run_tests on Linux")
       log.info("Executing __run_tests on Linux")
-      # These features do not work on Windows
       for test in tests:
       for test in tests:
         if __name__ == 'benchmark.benchmarker':
         if __name__ == 'benchmark.benchmarker':
           print Header("Running Test: %s" % test.name)
           print Header("Running Test: %s" % test.name)
           test_process = Process(target=self.__run_test, name="Test Runner (%s)" % test.name, args=(test,))
           test_process = Process(target=self.__run_test, name="Test Runner (%s)" % test.name, args=(test,))
           test_process.start()
           test_process.start()
-          test_process.join(self.run_test_timeout_seconds)
+          try:
+            test_process.join(self.run_test_timeout_seconds)
+          except KeyboardInterrupt:
+            # Politely SIGTERM everyong in the session
+            subprocess.call("sudo pkill -s %s" % test_process.pid, shell=True)
+            time.sleep(5)
+
           self.__load_results()  # Load intermediate result from child process
           self.__load_results()  # Load intermediate result from child process
           if(test_process.is_alive()):
           if(test_process.is_alive()):
             log.warn("Child process for {name} is still alive. Terminating.".format(name=test.name))
             log.warn("Child process for {name} is still alive. Terminating.".format(name=test.name))
             self.__write_intermediate_results(test.name,"__run_test timeout (="+ str(self.run_test_timeout_seconds) + " seconds)")
             self.__write_intermediate_results(test.name,"__run_test timeout (="+ str(self.run_test_timeout_seconds) + " seconds)")
-            test_process.terminate()
+            
+            # Politely SIGTERM everyone in the original process group
+            try:
+              print "SIGTERM TO PID %s" % test_process.pid
+              os.killpg(test_process.pid, signal.SIGTERM)
+            except OSError:
+              print "Failed"
+              pass
+            # Politely SIGTERM everyong in the session (this takes care
+            # of most servers that start up as long-running daemons)
+            subprocess.call("sudo pkill -s %s" % test_process.pid, shell=True)
+
+            # Some time to close up shop
+            time.sleep(3)
+
+            # SIGKILL any stragglers
+            try:
+              print "SIGKILL TO PID %s" % test_process.pid
+              os.killpg(test_process.pid, signal.SIGKILL)
+            except OSError:
+              print "Failed"
+              pass
+            subprocess.call("sudo pkill -SIGKILL -s %s" % test_process.pid, shell=True)
           log.handlers = []  # Clean up handlers left by __run_test
           log.handlers = []  # Clean up handlers left by __run_test
     log.info("End __run_tests")
     log.info("End __run_tests")
 
 
@@ -521,6 +549,24 @@ class Benchmarker:
   def __run_test(self, test):
   def __run_test(self, test):
     log.info("__run_test")
     log.info("__run_test")
 
 
+    if self.os.lower() != 'windows':
+      # Set our process ID to be the current session ID
+      os.setsid()
+
+      self.sigterm_has_fired = False
+      def signal_term_handler(signal, frame):
+        if self.sigterm_has_fired:
+          print "SIGTERM fired before, ignoring"
+          return 0
+        else:
+          print "SIGTERM fired"
+          self.sigterm_has_fired=True
+          log.info(Header("Cleaning up..."))
+          test.stop(log)
+          return 0
+
+      signal.signal(signal.SIGTERM, signal_term_handler)
+
     logfile = os.path.join(self.latest_results_directory, 'logs', test.name, 'output.log')
     logfile = os.path.join(self.latest_results_directory, 'logs', test.name, 'output.log')
     try:
     try:
       os.makedirs(os.path.dirname(logfile))
       os.makedirs(os.path.dirname(logfile))
@@ -622,9 +668,10 @@ class Benchmarker:
         test.benchmark(log)
         test.benchmark(log)
 
 
       # Stop this test
       # Stop this test
-      log.info(Header("Stopping %s" % test.name))
-      test.stop(log)
-      time.sleep(5)
+      if self.os.lower() == 'windows' or not self.sigterm_has_fired:
+        log.info(Header("Stopping %s" % test.name))
+        test.stop(log)
+        time.sleep(5)
 
 
       if self.__is_port_bound(test.port):
       if self.__is_port_bound(test.port):
         self.__write_intermediate_results(test.name, "port %s was not released by stop" % test.port)
         self.__write_intermediate_results(test.name, "port %s was not released by stop" % test.port)
@@ -648,17 +695,12 @@ class Benchmarker:
       log.debug("Subprocess Error Details", exc_info=True)
       log.debug("Subprocess Error Details", exc_info=True)
       try:
       try:
         test.stop(log)
         test.stop(log)
-      except (subprocess.CalledProcessError) as e:
+      except subprocess.CalledProcessError as e:
         self.__write_intermediate_results(test.name,"<setup.py>#stop() raised an error")
         self.__write_intermediate_results(test.name,"<setup.py>#stop() raised an error")
         log.error(Header("Subprocess Error: Test .stop() raised exception %s" % test.name))
         log.error(Header("Subprocess Error: Test .stop() raised exception %s" % test.name))
         log.error("%s", e)
         log.error("%s", e)
         log.error("%s", sys.exc_info()[:2])
         log.error("%s", sys.exc_info()[:2])
         log.debug("Subprocess Error Details", exc_info=True)
         log.debug("Subprocess Error Details", exc_info=True)
-    except (KeyboardInterrupt, SystemExit) as e:
-      test.stop(log)
-      log.info(Header("Cleaning up..."))
-      self.__finish()
-      sys.exit()
   ############################################################
   ############################################################
   # End __run_test
   # End __run_test
   ############################################################
   ############################################################