benchmarker.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import threading
  2. from docker.models.containers import Container
  3. from toolset.utils.output_helper import log, FNULL
  4. from toolset.utils.docker_helper import DockerHelper
  5. from toolset.utils.time_logger import TimeLogger
  6. from toolset.utils.metadata import Metadata
  7. from toolset.utils.results import Results
  8. from toolset.utils.audit import Audit
  9. import os
  10. import subprocess
  11. import traceback
  12. import sys
  13. import time
  14. import shlex
  15. from pprint import pprint
  16. from colorama import Fore
  17. import numbers
  18. class Benchmarker:
  19. def __init__(self, config):
  20. '''
  21. Initialize the benchmarker.
  22. '''
  23. self.config = config
  24. self.time_logger = TimeLogger()
  25. self.metadata = Metadata(self)
  26. self.audit = Audit(self)
  27. # a list of all tests for this run
  28. self.tests = self.metadata.tests_to_run()
  29. if self.config.reverse_order:
  30. self.tests.reverse()
  31. self.results = Results(self)
  32. self.docker_helper = DockerHelper(self)
  33. ##########################################################################################
  34. # Public methods
  35. ##########################################################################################
  36. def run(self):
  37. '''
  38. This process involves setting up the client/server machines
  39. with any necessary change. Then going through each test,
  40. running their docker build and run, verifying the URLs, and
  41. running benchmarks against them.
  42. '''
  43. # Generate metadata
  44. self.metadata.list_test_metadata()
  45. any_failed = False
  46. # Run tests
  47. log("Running Tests...", border='=')
  48. # build wrk and all databases needed for current run
  49. self.docker_helper.build_wrk()
  50. self.docker_helper.build_databases()
  51. with open(os.path.join(self.results.directory, 'benchmark.log'),
  52. 'w') as benchmark_log:
  53. for test in self.tests:
  54. log("Running Test: %s" % test.name, border='-')
  55. with self.config.quiet_out.enable():
  56. if not self.__run_test(test, benchmark_log):
  57. any_failed = True
  58. # Load intermediate result from child process
  59. self.results.load()
  60. # Parse results
  61. if self.config.mode == "benchmark":
  62. log("Parsing Results ...", border='=')
  63. self.results.parse(self.tests)
  64. self.results.set_completion_time()
  65. self.results.upload()
  66. self.results.finish()
  67. return any_failed
  68. def stop(self, signal=None, frame=None):
  69. log("Shutting down (may take a moment)")
  70. self.docker_helper.stop()
  71. sys.exit(0)
  72. ##########################################################################################
  73. # Private methods
  74. ##########################################################################################
  75. def __exit_test(self, success, prefix, file, message=None):
  76. if message:
  77. log(message,
  78. prefix=prefix,
  79. file=file,
  80. color=Fore.RED if success else '')
  81. self.time_logger.log_test_end(log_prefix=prefix, file=file)
  82. if self.config.mode == "benchmark":
  83. total_tcp_sockets = subprocess.check_output("ss -s | grep TCP: | awk '{print $2}'", shell=True, text=True)
  84. log("Total TCP sockets: " + total_tcp_sockets, prefix=prefix, file=file)
  85. if int(total_tcp_sockets) > 5000:
  86. # Sleep for 60 seconds to ensure all host connects are closed
  87. log("Clean up: Sleep 60 seconds...", prefix=prefix, file=file)
  88. time.sleep(60)
  89. # After benchmarks are complete for all test types in this test,
  90. # let's clean up leftover test images (techempower/tfb.test.test-name)
  91. self.docker_helper.clean()
  92. return success
  93. def __run_test(self, test, benchmark_log):
  94. '''
  95. Runs the given test, verifies that the webapp is accepting requests,
  96. optionally benchmarks the webapp, and ultimately stops all services
  97. started for this test.
  98. '''
  99. log_prefix = "%s: " % test.name
  100. # Start timing the total test duration
  101. self.time_logger.mark_test_start()
  102. if self.config.mode == "benchmark":
  103. log("Benchmarking %s" % test.name,
  104. file=benchmark_log,
  105. border='-')
  106. # If the test is in the excludes list, we skip it
  107. if self.config.exclude and test.name in self.config.exclude:
  108. message = "Test {name} has been added to the excludes list. Skipping.".format(
  109. name=test.name)
  110. self.results.write_intermediate(test.name, message)
  111. self.results.upload()
  112. return self.__exit_test(
  113. success=False,
  114. message=message,
  115. prefix=log_prefix,
  116. file=benchmark_log)
  117. database_container = None
  118. try:
  119. # Start database container
  120. if test.database.lower() != "none":
  121. self.time_logger.mark_starting_database()
  122. database_container = self.docker_helper.start_database(
  123. test.database.lower())
  124. if database_container is None:
  125. message = "ERROR: Problem building/running database container"
  126. self.results.write_intermediate(test.name, message)
  127. self.results.upload()
  128. return self.__exit_test(
  129. success=False,
  130. message=message,
  131. prefix=log_prefix,
  132. file=benchmark_log)
  133. self.time_logger.mark_started_database()
  134. # Start webapp
  135. container = test.start()
  136. self.time_logger.mark_test_starting()
  137. if container is None:
  138. message = "ERROR: Problem starting {name}".format(
  139. name=test.name)
  140. self.results.write_intermediate(test.name, message)
  141. self.results.upload()
  142. return self.__exit_test(
  143. success=False,
  144. message=message,
  145. prefix=log_prefix,
  146. file=benchmark_log)
  147. max_time = time.time() + 60
  148. while True:
  149. accepting_requests = test.is_accepting_requests()
  150. if accepting_requests \
  151. or time.time() >= max_time \
  152. or not self.docker_helper.server_container_exists(container.id):
  153. break
  154. time.sleep(1)
  155. if hasattr(test, 'wait_before_sending_requests') and isinstance(test.wait_before_sending_requests, numbers.Integral) and test.wait_before_sending_requests > 0:
  156. time.sleep(test.wait_before_sending_requests)
  157. if not accepting_requests:
  158. message = "ERROR: Framework is not accepting requests from client machine"
  159. self.results.write_intermediate(test.name, message)
  160. self.results.upload()
  161. return self.__exit_test(
  162. success=False,
  163. message=message,
  164. prefix=log_prefix,
  165. file=benchmark_log)
  166. self.time_logger.mark_test_accepting_requests()
  167. # Debug mode blocks execution here until ctrl+c
  168. if self.config.mode == "debug":
  169. msg = "Entering debug mode. Server http://localhost:%s has started. CTRL-c to stop.\r\n" % test.port
  170. msg = msg + "From outside vagrant: http://localhost:%s" % (int(test.port) + 20000)
  171. log(msg,
  172. prefix=log_prefix,
  173. file=benchmark_log,
  174. color=Fore.YELLOW)
  175. while True:
  176. time.sleep(1)
  177. # Verify URLs and audit
  178. log("Verifying framework URLs", prefix=log_prefix)
  179. self.time_logger.mark_verify_start()
  180. passed_verify = test.verify_urls()
  181. self.audit.audit_test_dir(test.directory)
  182. # Benchmark this test
  183. if self.config.mode == "benchmark":
  184. self.time_logger.mark_benchmarking_start()
  185. self.__benchmark(test, benchmark_log)
  186. self.time_logger.log_benchmarking_end(
  187. log_prefix=log_prefix, file=benchmark_log)
  188. # Log test timing stats
  189. self.time_logger.log_build_flush(benchmark_log)
  190. self.time_logger.log_database_start_time(log_prefix, benchmark_log)
  191. self.time_logger.log_test_accepting_requests(
  192. log_prefix, benchmark_log)
  193. self.time_logger.log_verify_end(log_prefix, benchmark_log)
  194. # Save results thus far into the latest results directory
  195. self.results.write_intermediate(test.name,
  196. time.strftime(
  197. "%Y%m%d%H%M%S",
  198. time.localtime()))
  199. # Upload the results thus far to another server (optional)
  200. self.results.upload()
  201. if self.config.mode == "verify" and not passed_verify:
  202. return self.__exit_test(
  203. success=False,
  204. message="Failed verify!",
  205. prefix=log_prefix,
  206. file=benchmark_log)
  207. except Exception as e:
  208. tb = traceback.format_exc()
  209. self.results.write_intermediate(test.name,
  210. "error during test: " + str(e))
  211. self.results.upload()
  212. log(tb, prefix=log_prefix, file=benchmark_log)
  213. return self.__exit_test(
  214. success=False,
  215. message="Error during test: %s" % test.name,
  216. prefix=log_prefix,
  217. file=benchmark_log)
  218. finally:
  219. self.docker_helper.stop()
  220. return self.__exit_test(
  221. success=True, prefix=log_prefix, file=benchmark_log)
  222. def __benchmark(self, framework_test, benchmark_log):
  223. '''
  224. Runs the benchmark for each type of test that it implements
  225. '''
  226. def benchmark_type(test_type):
  227. log("BENCHMARKING %s ... " % test_type.upper(), file=benchmark_log)
  228. test = framework_test.runTests[test_type]
  229. if not test.failed:
  230. # Begin resource usage metrics collection
  231. self.__begin_logging(framework_test, test_type)
  232. script = self.config.types[test_type].get_script_name()
  233. script_variables = self.config.types[
  234. test_type].get_script_variables(
  235. test.name, "http://%s:%s%s" % (self.config.server_host,
  236. framework_test.port,
  237. test.get_url()))
  238. benchmark_container = self.docker_helper.benchmark(script, script_variables)
  239. self.__log_container_output(benchmark_container, framework_test, test_type)
  240. # End resource usage metrics collection
  241. self.__end_logging()
  242. results = self.results.parse_test(framework_test, test_type)
  243. log("Benchmark results:", file=benchmark_log)
  244. # TODO move into log somehow
  245. pprint(results)
  246. self.results.report_benchmark_results(framework_test, test_type,
  247. results['results'])
  248. log("Complete", file=benchmark_log)
  249. for test_type in framework_test.runTests:
  250. benchmark_type(test_type)
  251. def __begin_logging(self, framework_test, test_type):
  252. '''
  253. Starts a thread to monitor the resource usage, to be synced with the
  254. client's time.
  255. TODO: MySQL and InnoDB are possible. Figure out how to implement them.
  256. '''
  257. output_file = "{file_name}".format(
  258. file_name=self.results.get_stats_file(framework_test.name,
  259. test_type))
  260. dool_string = "dool -Tafilmprs --aio --fs --ipc --lock --socket --tcp \
  261. --raw --udp --unix --vm --disk-util \
  262. --rpc --rpcd --output {output_file}".format(
  263. output_file=output_file)
  264. cmd = shlex.split(dool_string)
  265. self.subprocess_handle = subprocess.Popen(
  266. cmd, stdout=FNULL, stderr=subprocess.STDOUT)
  267. def __end_logging(self):
  268. '''
  269. Stops the logger thread and blocks until shutdown is complete.
  270. '''
  271. self.subprocess_handle.terminate()
  272. self.subprocess_handle.communicate()
  273. def __log_container_output(self, container: Container, framework_test, test_type) -> None:
  274. def save_docker_logs(stream):
  275. raw_file_path = self.results.get_raw_file(framework_test.name, test_type)
  276. with open(raw_file_path, 'w') as file:
  277. for line in stream:
  278. log(line.decode(), file=file)
  279. def save_docker_stats(stream):
  280. docker_file_path = self.results.get_docker_stats_file(framework_test.name, test_type)
  281. with open(docker_file_path, 'w') as file:
  282. file.write('[\n')
  283. is_first_line = True
  284. for line in stream:
  285. if is_first_line:
  286. is_first_line = False
  287. else:
  288. file.write(',')
  289. file.write(line.decode())
  290. file.write(']')
  291. threads = [
  292. threading.Thread(target=lambda: save_docker_logs(container.logs(stream=True))),
  293. threading.Thread(target=lambda: save_docker_stats(container.stats(stream=True)))
  294. ]
  295. [thread.start() for thread in threads]
  296. [thread.join() for thread in threads]