123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- from toolset.utils.output_helper import header
- from toolset.utils.metadata_helper import gather_tests, gather_remaining_tests
- from toolset.utils import docker_helper
- import os
- import subprocess
- import traceback
- import sys
- import logging
- import socket
- import time
- import json
- import shlex
- from pprint import pprint
- from multiprocessing import Process
- class Benchmarker:
- def __init__(self, config, results):
- '''
- Initialize the benchmarker.
- '''
- self.config = config
- self.results = results
- ##########################################################################################
- # Public methods
- ##########################################################################################
- def run(self):
- '''
- This process involves setting up the client/server machines
- with any necessary change. Then going through each test,
- running their setup script, verifying the URLs, and
- running benchmarks against them.
- '''
- # Generate metadata
- self.__run_list_test_metadata()
- # Get a list of all known tests that we can run.
- all_tests = gather_remaining_tests(self.config, self.results)
- # Setup client/server
- print(
- header(
- "Preparing Server, Database, and Client ...",
- top='=',
- bottom='='))
- with self.config.quiet_out.enable():
- self.__setup_server()
- self.__setup_database()
- self.__setup_client()
- # Run tests
- print(header("Running Tests...", top='=', bottom='='))
- result = self.__run_tests(all_tests)
- # Parse results
- if self.config.mode == "benchmark":
- print(header("Parsing Results ...", top='=', bottom='='))
- self.results.parse(all_tests)
- self.results.set_completion_time()
- self.results.upload()
- self.results.finish()
- return result
- ##########################################################################################
- # Private methods
- ##########################################################################################
- def __benchmark(self, framework_test, logPath):
- '''
- Runs the benchmark for each type of test that it implements
- '''
- def benchmark_type(test_type):
- benchmarkPath = os.path.join(logPath, test_type)
- try:
- os.makedirs(benchmarkPath)
- except OSError:
- pass
- with open(os.path.join(benchmarkPath, 'benchmark.txt'),
- 'w') as out:
- out.write("BENCHMARKING %s ... " % test_type.upper())
- test = framework_test.runTests[test_type]
- test.setup_out(out)
- raw_file = self.results.get_raw_file(framework_test.name,
- test_type)
- if not os.path.exists(raw_file):
- # Open to create the empty file
- with open(raw_file, 'w'):
- pass
- if not test.failed:
- remote_script = self.config.types[
- test_type].get_remote_script(self.config, test.name,
- test.get_url(),
- framework_test.port)
- # Begin resource usage metrics collection
- self.__begin_logging(framework_test, test_type)
- # Run the benchmark
- with open(raw_file, 'w') as raw_file:
- p = subprocess.Popen(
- self.config.client_ssh_string.split(" "),
- stdin=subprocess.PIPE,
- stdout=raw_file,
- stderr=raw_file)
- p.communicate(remote_script)
- out.flush()
- # End resource usage metrics collection
- self.__end_logging()
- results = self.results.parse_test(framework_test, test_type)
- print("Benchmark results:")
- pprint(results)
- self.results.report_benchmark_results(
- framework_test, test_type, results['results'])
- out.write("Complete\n")
- out.flush()
- for test_type in framework_test.runTests:
- benchmark_type(test_type)
- def __run_list_test_metadata(self):
- '''
- Prints the metadata for all the available tests
- '''
- all_tests = gather_tests(benchmarker_config=self.config)
- all_tests_json = json.dumps(map(lambda test: {
- "name": test.name,
- "approach": test.approach,
- "classification": test.classification,
- "database": test.database,
- "framework": test.framework,
- "language": test.language,
- "orm": test.orm,
- "platform": test.platform,
- "webserver": test.webserver,
- "os": test.os,
- "database_os": test.database_os,
- "display_name": test.display_name,
- "notes": test.notes,
- "versus": test.versus
- }, all_tests))
- with open(
- os.path.join(self.results.directory, "test_metadata.json"),
- "w") as f:
- f.write(all_tests_json)
- def __begin_logging(self, framework_test, test_type):
- '''
- Starts a thread to monitor the resource usage, to be synced with the
- client's time.
- TODO: MySQL and InnoDB are possible. Figure out how to implement them.
- '''
- output_file = "{file_name}".format(
- file_name=self.results.get_stats_file(framework_test.name,
- test_type))
- dstat_string = "dstat -Tafilmprs --aio --fs --ipc --lock --raw --socket --tcp \
- --raw --socket --tcp --udp --unix --vm --disk-util \
- --rpc --rpcd --output {output_file}".format(
- output_file=output_file)
- cmd = shlex.split(dstat_string)
- dev_null = open(os.devnull, "w")
- self.subprocess_handle = subprocess.Popen(
- cmd, stdout=dev_null, stderr=subprocess.STDOUT)
- def __end_logging(self):
- '''
- Stops the logger thread and blocks until shutdown is complete.
- '''
- self.subprocess_handle.terminate()
- self.subprocess_handle.communicate()
- def __setup_server(self):
- '''
- Makes any necessary changes to the server that should be
- made before running the tests. This involves setting kernal
- settings to allow for more connections, or more file
- descriptiors
-
- http://redmine.lighttpd.net/projects/weighttp/wiki#Troubleshooting
- '''
- try:
- if os.name == 'nt':
- return True
- subprocess.call(
- ['sudo', 'sysctl', '-w', 'net.ipv4.tcp_max_syn_backlog=65535'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- subprocess.call(
- ['sudo', 'sysctl', '-w', 'net.core.somaxconn=65535'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- subprocess.call(
- ['sudo', 'sysctl', 'net.ipv4.tcp_tw_reuse=1'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- subprocess.call(
- ['sudo', 'sysctl', 'net.ipv4.tcp_tw_recycle=1'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- subprocess.call(
- ['sudo', 'sysctl', '-w', 'kernel.shmmax=134217728'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- subprocess.call(
- ['sudo', 'sysctl', '-w', 'kernel.shmall=2097152'],
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- with open(os.path.join(self.results.directory, 'sysctl.txt'),
- 'w') as f:
- f.write(subprocess.check_output(['sudo', 'sysctl', '-a']))
- except subprocess.CalledProcessError:
- return False
- def __setup_database(self):
- '''
- Makes any necessary changes to the database machine that should be made
- before running the tests. Is very similar to the server setup, but may also
- include database specific changes.
- Explanations:
- net.ipv4.tcp_max_syn_backlog, net.core.somaxconn, kernel.sched_autogroup_enabled: http://tweaked.io/guide/kernel/
- ulimit -n: http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
- net.ipv4.tcp_tw_*: http://www.linuxbrigade.com/reduce-time_wait-socket-connections/
- kernel.shm*: http://seriousbirder.com/blogs/linux-understanding-shmmax-and-shmall-settings/
- For kernel.sem: https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/5/html/Tuning_and_Optimizing_Red_Hat_Enterprise_Linux_for_Oracle_9i_and_10g_Databases/chap-Oracle_9i_and_10g_Tuning_Guide-Setting_Semaphores.html
- '''
- p = subprocess.Popen(
- self.config.database_ssh_string,
- stdin=subprocess.PIPE,
- shell=True,
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- p.communicate("""
- sudo sysctl -w net.ipv4.tcp_max_syn_backlog=65535
- sudo sysctl -w net.core.somaxconn=65535
- sudo sysctl -w kernel.sched_autogroup_enabled=0
- sudo -s ulimit -n 65535
- sudo sysctl net.ipv4.tcp_tw_reuse=1
- sudo sysctl net.ipv4.tcp_tw_recycle=1
- sudo sysctl -w kernel.shmmax=2147483648
- sudo sysctl -w kernel.shmall=2097152
- sudo sysctl -w kernel.sem="250 32000 256 512"
- """)
- # TODO - print kernel configuration to file
- # echo "Printing kernel configuration:" && sudo sysctl -a
- def __setup_client(self):
- '''
- Makes any necessary changes to the client machine that should be made
- before running the tests. Is very similar to the server setup, but may also
- include client specific changes.
- '''
- p = subprocess.Popen(
- self.config.client_ssh_string,
- stdin=subprocess.PIPE,
- shell=True,
- stdout=self.config.quiet_out,
- stderr=subprocess.STDOUT)
- p.communicate("""
- sudo sysctl -w net.ipv4.tcp_max_syn_backlog=65535
- sudo sysctl -w net.core.somaxconn=65535
- sudo -s ulimit -n 65535
- sudo sysctl net.ipv4.tcp_tw_reuse=1
- sudo sysctl net.ipv4.tcp_tw_recycle=1
- sudo sysctl -w kernel.shmmax=2147483648
- sudo sysctl -w kernel.shmall=2097152
- """)
- def __run_tests(self, tests):
- '''
- Calls each test passed in tests to __run_test in a separate process.
- Each test is given a set amount of time and if kills the child process
- (and subsequently all of its child processes).
- '''
- if len(tests) == 0:
- return 0
- logging.debug("Start __run_tests.")
- logging.debug("__name__ = %s", __name__)
- error_happened = False
- if self.config.os.lower() == 'windows':
- logging.debug("Executing __run_tests on Windows")
- for test in tests:
- with self.config.quiet_out.enable():
- if self.__run_test(test) != 0:
- error_happened = True
- else:
- logging.debug("Executing __run_tests on Linux")
- # These features do not work on Windows
- for test in tests:
- print(header("Running Test: %s" % test.name))
- with self.config.quiet_out.enable():
- test_process = Process(
- target=self.__run_test,
- name="Test Runner (%s)" % test.name,
- args=(test, ))
- test_process.start()
- test_process.join(self.config.run_test_timeout_seconds)
- # Load intermediate result from child process
- self.results.load()
- if (test_process.is_alive()):
- logging.debug(
- "Child process for {name} is still alive. Terminating.".
- format(name=test.name))
- self.results.write_intermediate(
- test.name, "__run_test timeout (=" +
- str(self.config.run_test_timeout_seconds) +
- " seconds)")
- test_process.terminate()
- test_process.join()
- if test_process.exitcode != 0:
- error_happened = True
- logging.debug("End __run_tests.")
- if error_happened:
- return 1
- return 0
- def __run_test(self, test):
- '''
- Ensures that the system has all necessary software to run the tests.
- This does not include that software for the individual test, but covers
- software such as curl and weighttp that are needed.
- '''
- logDir = os.path.join(self.results.directory, test.name.lower())
- try:
- os.makedirs(logDir)
- except Exception:
- pass
- with open(os.path.join(logDir, 'out.txt'), 'w') as out:
- if test.os.lower() != self.config.os.lower(
- ) or test.database_os.lower() != self.config.database_os.lower():
- out.write(
- "OS or Database OS specified in benchmark_config.json does not match the current environment. Skipping.\n"
- )
- return sys.exit(0)
- # If the test is in the excludes list, we skip it
- if self.config.exclude != None and test.name in self.config.exclude:
- out.write(
- "Test {name} has been added to the excludes list. Skipping.\n".
- format(name=test.name))
- return sys.exit(0)
- database_container_id = None
- try:
- if self.__is_port_bound(test.port):
- time.sleep(60)
- if self.__is_port_bound(test.port):
- # We gave it our all
- self.results.write_intermediate(test.name, "port " + str(
- test.port) + " is not available before start")
- out.write(
- header(
- "Error: Port %s is not available, cannot start %s"
- % (test.port, test.name)))
- out.flush()
- print("Error: Unable to recover port, cannot start test")
- return sys.exit(1)
- # Start database container
- if test.database.lower() != "none":
- database_container_id = docker_helper.start_database(
- self.config, test.database.lower())
- if not database_container_id:
- out.write(
- "ERROR: Problem building/running database container"
- )
- out.flush()
- self.results.write_intermediate(
- test.name, "ERROR: Problem starting")
- return sys.exit(1)
- # Start webapp
- result = test.start(out, database_container_id)
- if result != 0:
- docker_helper.stop(self.config, database_container_id,
- test, out)
- time.sleep(5)
- out.write("ERROR: Problem starting {name}\n".format(
- name=test.name))
- out.flush()
- self.results.write_intermediate(test.name,
- "ERROR: Problem starting")
- return sys.exit(1)
- logging.info("Sleeping %s seconds to ensure framework is ready"
- % self.config.sleep)
- time.sleep(self.config.sleep)
- # Verify URLs
- if self.config.mode == "debug":
- logging.info(
- "Entering debug mode. Server has started. CTRL-c to stop."
- )
- while True:
- time.sleep(1)
- else:
- logging.info("Verifying framework URLs")
- passed_verify = test.verify_urls(logDir)
- # Benchmark this test
- if self.config.mode == "benchmark":
- logging.info("Benchmarking")
- out.write(header("Benchmarking %s" % test.name))
- out.flush()
- self.__benchmark(test, logDir)
- # Stop this test
- docker_helper.stop(self.config, database_container_id, test,
- out)
- # Remove contents of /tmp folder
- try:
- subprocess.check_call(
- 'sudo rm -rf /tmp/*',
- shell=True,
- stderr=out,
- stdout=out)
- except Exception:
- out.write(header("Error: Could not empty /tmp"))
- # Save results thus far into the latest results directory
- self.results.write_intermediate(test.name,
- time.strftime(
- "%Y%m%d%H%M%S",
- time.localtime()))
- # Upload the results thus far to another server (optional)
- self.results.upload()
- if self.config.mode == "verify" and not passed_verify:
- print("Failed verify!")
- return sys.exit(1)
- except KeyboardInterrupt:
- docker_helper.stop(self.config, database_container_id, test,
- out)
- except (OSError, IOError, subprocess.CalledProcessError) as e:
- traceback.print_exc()
- self.results.write_intermediate(
- test.name, "error during test setup: " + str(e))
- out.write(header("Subprocess Error %s" % test.name))
- traceback.print_exc(file=out)
- out.flush()
- out.close()
- return sys.exit(1)
- out.close()
- return sys.exit(0)
- def __is_port_bound(self, port):
- '''
- Check if the requested port is available. If it isn't available, then a
- previous test probably didn't shutdown properly.
- '''
- port = int(port)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- # Try to bind to all IP addresses, this port
- s.bind(("", port))
- # If we get here, we were able to bind successfully,
- # which means the port is free.
- except socket.error:
- # If we get an exception, it might be because the port is still bound
- # which would be bad, or maybe it is a privileged port (<1024) and we
- # are not running as root, or maybe the server is gone, but sockets are
- # still in TIME_WAIT (SO_REUSEADDR). To determine which scenario, try to
- # connect.
- try:
- s.connect(("127.0.0.1", port))
- # If we get here, we were able to connect to something, which means
- # that the port is still bound.
- return True
- except socket.error:
- # An exception means that we couldn't connect, so a server probably
- # isn't still running on the port.
- pass
- finally:
- s.close()
- return False
|