123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- import threading
- from docker.models.containers import Container
- from toolset.utils.output_helper import log, FNULL
- from toolset.utils.docker_helper import DockerHelper
- from toolset.utils.time_logger import TimeLogger
- from toolset.utils.metadata import Metadata
- from toolset.utils.results import Results
- from toolset.utils.audit import Audit
- import os
- import subprocess
- import traceback
- import sys
- import time
- import shlex
- from pprint import pprint
- from colorama import Fore
- import numbers
- class Benchmarker:
- def __init__(self, config):
- '''
- Initialize the benchmarker.
- '''
- self.config = config
- self.time_logger = TimeLogger()
- self.metadata = Metadata(self)
- self.audit = Audit(self)
- # a list of all tests for this run
- self.tests = self.metadata.tests_to_run()
- if self.config.reverse_order:
- self.tests.reverse()
- self.results = Results(self)
- self.docker_helper = DockerHelper(self)
- ##########################################################################################
- # Public methods
- ##########################################################################################
- def run(self):
- '''
- This process involves setting up the client/server machines
- with any necessary change. Then going through each test,
- running their docker build and run, verifying the URLs, and
- running benchmarks against them.
- '''
- # Generate metadata
- self.metadata.list_test_metadata()
- any_failed = False
- # Run tests
- log("Running Tests...", border='=')
- # build wrk and all databases needed for current run
- self.docker_helper.build_wrk()
- self.docker_helper.build_databases()
- with open(os.path.join(self.results.directory, 'benchmark.log'),
- 'w') as benchmark_log:
- for test in self.tests:
- log("Running Test: %s" % test.name, border='-')
- with self.config.quiet_out.enable():
- if not self.__run_test(test, benchmark_log):
- any_failed = True
- # Load intermediate result from child process
- self.results.load()
- # Parse results
- if self.config.mode == "benchmark":
- log("Parsing Results ...", border='=')
- self.results.parse(self.tests)
- self.results.set_completion_time()
- self.results.upload()
- self.results.finish()
- return any_failed
- def stop(self, signal=None, frame=None):
- log("Shutting down (may take a moment)")
- self.docker_helper.stop()
- sys.exit(0)
- ##########################################################################################
- # Private methods
- ##########################################################################################
- def __exit_test(self, success, prefix, file, message=None):
- if message:
- log(message,
- prefix=prefix,
- file=file,
- color=Fore.RED if success else '')
- self.time_logger.log_test_end(log_prefix=prefix, file=file)
- if self.config.mode == "benchmark":
- total_tcp_sockets = subprocess.check_output("ss -s | grep TCP: | awk '{print $2}'", shell=True, text=True)
- log("Total TCP sockets: " + total_tcp_sockets, prefix=prefix, file=file)
- if int(total_tcp_sockets) > 5000:
- # Sleep for 60 seconds to ensure all host connects are closed
- log("Clean up: Sleep 60 seconds...", prefix=prefix, file=file)
- time.sleep(60)
- # After benchmarks are complete for all test types in this test,
- # let's clean up leftover test images (techempower/tfb.test.test-name)
- self.docker_helper.clean()
- return success
- def __run_test(self, test, benchmark_log):
- '''
- Runs the given test, verifies that the webapp is accepting requests,
- optionally benchmarks the webapp, and ultimately stops all services
- started for this test.
- '''
- log_prefix = "%s: " % test.name
- # Start timing the total test duration
- self.time_logger.mark_test_start()
- if self.config.mode == "benchmark":
- log("Benchmarking %s" % test.name,
- file=benchmark_log,
- border='-')
- # If the test is in the excludes list, we skip it
- if self.config.exclude and test.name in self.config.exclude:
- message = "Test {name} has been added to the excludes list. Skipping.".format(
- name=test.name)
- self.results.write_intermediate(test.name, message)
- self.results.upload()
- return self.__exit_test(
- success=False,
- message=message,
- prefix=log_prefix,
- file=benchmark_log)
- database_container = None
- try:
- # Start database container
- if test.database.lower() != "none":
- self.time_logger.mark_starting_database()
- database_container = self.docker_helper.start_database(
- test.database.lower())
- if database_container is None:
- message = "ERROR: Problem building/running database container"
- self.results.write_intermediate(test.name, message)
- self.results.upload()
- return self.__exit_test(
- success=False,
- message=message,
- prefix=log_prefix,
- file=benchmark_log)
- self.time_logger.mark_started_database()
- # Start webapp
- container = test.start()
- self.time_logger.mark_test_starting()
- if container is None:
- message = "ERROR: Problem starting {name}".format(
- name=test.name)
- self.results.write_intermediate(test.name, message)
- self.results.upload()
- return self.__exit_test(
- success=False,
- message=message,
- prefix=log_prefix,
- file=benchmark_log)
- max_time = time.time() + 60
- while True:
- accepting_requests = test.is_accepting_requests()
- if accepting_requests \
- or time.time() >= max_time \
- or not self.docker_helper.server_container_exists(container.id):
- break
- time.sleep(1)
- if hasattr(test, 'wait_before_sending_requests') and isinstance(test.wait_before_sending_requests, numbers.Integral) and test.wait_before_sending_requests > 0:
- time.sleep(test.wait_before_sending_requests)
- if not accepting_requests:
- message = "ERROR: Framework is not accepting requests from client machine"
- self.results.write_intermediate(test.name, message)
- self.results.upload()
- return self.__exit_test(
- success=False,
- message=message,
- prefix=log_prefix,
- file=benchmark_log)
- self.time_logger.mark_test_accepting_requests()
- # Debug mode blocks execution here until ctrl+c
- if self.config.mode == "debug":
- msg = "Entering debug mode. Server http://localhost:%s has started. CTRL-c to stop.\r\n" % test.port
- msg = msg + "From outside vagrant: http://localhost:%s" % (int(test.port) + 20000)
- log(msg,
- prefix=log_prefix,
- file=benchmark_log,
- color=Fore.YELLOW)
- while True:
- time.sleep(1)
- # Verify URLs and audit
- log("Verifying framework URLs", prefix=log_prefix)
- self.time_logger.mark_verify_start()
- passed_verify = test.verify_urls()
- self.audit.audit_test_dir(test.directory)
- # Benchmark this test
- if self.config.mode == "benchmark":
- self.time_logger.mark_benchmarking_start()
- self.__benchmark(test, benchmark_log)
- self.time_logger.log_benchmarking_end(
- log_prefix=log_prefix, file=benchmark_log)
- # Log test timing stats
- self.time_logger.log_build_flush(benchmark_log)
- self.time_logger.log_database_start_time(log_prefix, benchmark_log)
- self.time_logger.log_test_accepting_requests(
- log_prefix, benchmark_log)
- self.time_logger.log_verify_end(log_prefix, benchmark_log)
- # Save results thus far into the latest results directory
- self.results.write_intermediate(test.name,
- time.strftime(
- "%Y%m%d%H%M%S",
- time.localtime()))
- # Upload the results thus far to another server (optional)
- self.results.upload()
- if self.config.mode == "verify" and not passed_verify:
- return self.__exit_test(
- success=False,
- message="Failed verify!",
- prefix=log_prefix,
- file=benchmark_log)
- except Exception as e:
- tb = traceback.format_exc()
- self.results.write_intermediate(test.name,
- "error during test: " + str(e))
- self.results.upload()
- log(tb, prefix=log_prefix, file=benchmark_log)
- return self.__exit_test(
- success=False,
- message="Error during test: %s" % test.name,
- prefix=log_prefix,
- file=benchmark_log)
- finally:
- self.docker_helper.stop()
- return self.__exit_test(
- success=True, prefix=log_prefix, file=benchmark_log)
- def __benchmark(self, framework_test, benchmark_log):
- '''
- Runs the benchmark for each type of test that it implements
- '''
- def benchmark_type(test_type):
- log("BENCHMARKING %s ... " % test_type.upper(), file=benchmark_log)
- test = framework_test.runTests[test_type]
- if not test.failed:
- # Begin resource usage metrics collection
- self.__begin_logging(framework_test, test_type)
- script = self.config.types[test_type].get_script_name()
- script_variables = self.config.types[
- test_type].get_script_variables(
- test.name, "http://%s:%s%s" % (self.config.server_host,
- framework_test.port,
- test.get_url()))
- benchmark_container = self.docker_helper.benchmark(script, script_variables)
- self.__log_container_output(benchmark_container, framework_test, test_type)
- # End resource usage metrics collection
- self.__end_logging()
- results = self.results.parse_test(framework_test, test_type)
- log("Benchmark results:", file=benchmark_log)
- # TODO move into log somehow
- pprint(results)
- self.results.report_benchmark_results(framework_test, test_type,
- results['results'])
- log("Complete", file=benchmark_log)
- for test_type in framework_test.runTests:
- benchmark_type(test_type)
- def __begin_logging(self, framework_test, test_type):
- '''
- Starts a thread to monitor the resource usage, to be synced with the
- client's time.
- TODO: MySQL and InnoDB are possible. Figure out how to implement them.
- '''
- output_file = "{file_name}".format(
- file_name=self.results.get_stats_file(framework_test.name,
- test_type))
- dool_string = "dool -Tafilmprs --aio --fs --ipc --lock --socket --tcp \
- --raw --udp --unix --vm --disk-util \
- --rpc --rpcd --output {output_file}".format(
- output_file=output_file)
- cmd = shlex.split(dool_string)
- self.subprocess_handle = subprocess.Popen(
- cmd, stdout=FNULL, stderr=subprocess.STDOUT)
- def __end_logging(self):
- '''
- Stops the logger thread and blocks until shutdown is complete.
- '''
- self.subprocess_handle.terminate()
- self.subprocess_handle.communicate()
- def __log_container_output(self, container: Container, framework_test, test_type) -> None:
- def save_docker_logs(stream):
- raw_file_path = self.results.get_raw_file(framework_test.name, test_type)
- with open(raw_file_path, 'w') as file:
- for line in stream:
- log(line.decode(), file=file)
- def save_docker_stats(stream):
- docker_file_path = self.results.get_docker_stats_file(framework_test.name, test_type)
- with open(docker_file_path, 'w') as file:
- file.write('[\n')
- is_first_line = True
- for line in stream:
- if is_first_line:
- is_first_line = False
- else:
- file.write(',')
- file.write(line.decode())
- file.write(']')
- threads = [
- threading.Thread(target=lambda: save_docker_logs(container.logs(stream=True))),
- threading.Thread(target=lambda: save_docker_stats(container.stats(stream=True)))
- ]
- [thread.start() for thread in threads]
- [thread.join() for thread in threads]
|