results.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. from toolset.utils.output_helper import log
  2. from toolset.test_types import test_types
  3. import os
  4. import subprocess
  5. import uuid
  6. import time
  7. import json
  8. import requests
  9. import threading
  10. import re
  11. import math
  12. import csv
  13. import traceback
  14. from datetime import datetime
  15. # Cross-platform colored text
  16. from colorama import Fore, Style
  17. class ByteEncoder(json.JSONEncoder):
  18. def default(self, obj):
  19. if isinstance(obj, bytes):
  20. return obj.decode()
  21. return super().default(obj)
  22. class Results:
  23. def __init__(self, benchmarker):
  24. '''
  25. Constructor
  26. '''
  27. self.benchmarker = benchmarker
  28. self.config = benchmarker.config
  29. self.directory = os.path.join(self.config.results_root,
  30. self.config.timestamp)
  31. try:
  32. os.makedirs(self.directory)
  33. except OSError:
  34. pass
  35. self.file = os.path.join(self.directory, "results.json")
  36. self.uuid = str(uuid.uuid4())
  37. self.name = datetime.now().strftime(self.config.results_name)
  38. self.environmentDescription = self.config.results_environment
  39. try:
  40. self.git = dict()
  41. subprocess.call('git config --global --add safe.directory {}'.format(self.config.fw_root),
  42. shell=True,
  43. cwd=self.config.fw_root)
  44. self.git['commitId'] = self.__get_git_commit_id()
  45. self.git['repositoryUrl'] = self.__get_git_repository_url()
  46. self.git['branchName'] = self.__get_git_branch_name()
  47. except Exception:
  48. #Could not read local git repository, which is fine.
  49. self.git = None
  50. self.startTime = int(round(time.time() * 1000))
  51. self.completionTime = None
  52. self.concurrencyLevels = self.config.concurrency_levels
  53. self.pipelineConcurrencyLevels = self.config.pipeline_concurrency_levels
  54. self.queryIntervals = self.config.query_levels
  55. self.cachedQueryIntervals = self.config.cached_query_levels
  56. self.frameworks = [t.name for t in benchmarker.tests]
  57. self.duration = self.config.duration
  58. self.rawData = dict()
  59. self.completed = dict()
  60. self.succeeded = dict()
  61. self.failed = dict()
  62. self.verify = dict()
  63. for type in test_types:
  64. self.rawData[type] = dict()
  65. self.failed[type] = []
  66. self.succeeded[type] = []
  67. #############################################################################
  68. # PUBLIC FUNCTIONS
  69. #############################################################################
  70. def parse(self, tests):
  71. '''
  72. Ensures that the system has all necessary software to run
  73. the tests. This does not include that software for the individual
  74. test, but covers software such as curl and weighttp that
  75. are needed.
  76. '''
  77. # Run the method to get the commmit count of each framework.
  78. self.__count_commits()
  79. # Call the method which counts the sloc for each framework
  80. self.__count_sloc()
  81. # Time to create parsed files
  82. # Aggregate JSON file
  83. with open(self.file, "w") as f:
  84. f.write(json.dumps(self.__to_jsonable(), indent=2))
  85. def parse_test(self, framework_test, test_type):
  86. '''
  87. Parses the given test and test_type from the raw_file.
  88. '''
  89. results = dict()
  90. results['results'] = []
  91. stats = []
  92. if os.path.exists(self.get_raw_file(framework_test.name, test_type)):
  93. with open(self.get_raw_file(framework_test.name,
  94. test_type)) as raw_data:
  95. is_warmup = True
  96. rawData = None
  97. for line in raw_data:
  98. if "Queries:" in line or "Concurrency:" in line:
  99. is_warmup = False
  100. rawData = None
  101. continue
  102. if "Warmup" in line or "Primer" in line:
  103. is_warmup = True
  104. continue
  105. if not is_warmup:
  106. if rawData is None:
  107. rawData = dict()
  108. results['results'].append(rawData)
  109. if "Latency" in line:
  110. m = re.findall(r"([0-9]+\.*[0-9]*[us|ms|s|m|%]+)",
  111. line)
  112. if len(m) == 4:
  113. rawData['latencyAvg'] = m[0]
  114. rawData['latencyStdev'] = m[1]
  115. rawData['latencyMax'] = m[2]
  116. if "requests in" in line:
  117. m = re.search("([0-9]+) requests in", line)
  118. if m is not None:
  119. rawData['totalRequests'] = int(m.group(1))
  120. if "Socket errors" in line:
  121. if "connect" in line:
  122. m = re.search("connect ([0-9]+)", line)
  123. rawData['connect'] = int(m.group(1))
  124. if "read" in line:
  125. m = re.search("read ([0-9]+)", line)
  126. rawData['read'] = int(m.group(1))
  127. if "write" in line:
  128. m = re.search("write ([0-9]+)", line)
  129. rawData['write'] = int(m.group(1))
  130. if "timeout" in line:
  131. m = re.search("timeout ([0-9]+)", line)
  132. rawData['timeout'] = int(m.group(1))
  133. if "Non-2xx" in line:
  134. m = re.search("Non-2xx or 3xx responses: ([0-9]+)",
  135. line)
  136. if m != None:
  137. rawData['5xx'] = int(m.group(1))
  138. if "STARTTIME" in line:
  139. m = re.search("[0-9]+", line)
  140. rawData["startTime"] = int(m.group(0))
  141. if "ENDTIME" in line:
  142. m = re.search("[0-9]+", line)
  143. rawData["endTime"] = int(m.group(0))
  144. test_stats = self.__parse_stats(
  145. framework_test, test_type,
  146. rawData["startTime"], rawData["endTime"], 1)
  147. stats.append(test_stats)
  148. with open(
  149. self.get_stats_file(framework_test.name, test_type) + ".json",
  150. "w") as stats_file:
  151. json.dump(stats, stats_file, indent=2)
  152. return results
  153. def parse_all(self, framework_test):
  154. '''
  155. Method meant to be run for a given timestamp
  156. '''
  157. for test_type in framework_test.runTests:
  158. if os.path.exists(
  159. self.get_raw_file(framework_test.name, test_type)):
  160. results = self.parse_test(framework_test, test_type)
  161. self.report_benchmark_results(framework_test, test_type,
  162. results['results'])
  163. def write_intermediate(self, test_name, status_message):
  164. '''
  165. Writes the intermediate results for the given test_name and status_message
  166. '''
  167. self.completed[test_name] = status_message
  168. self.__write_results()
  169. def set_completion_time(self):
  170. '''
  171. Sets the completionTime for these results and writes the results
  172. '''
  173. self.completionTime = int(round(time.time() * 1000))
  174. self.__write_results()
  175. def upload(self):
  176. '''
  177. Attempts to upload the results.json to the configured results_upload_uri
  178. '''
  179. if self.config.results_upload_uri is not None:
  180. try:
  181. requests.post(
  182. self.config.results_upload_uri,
  183. headers={'Content-Type': 'application/json'},
  184. data=json.dumps(self.__to_jsonable(), indent=2),
  185. timeout=300)
  186. except Exception:
  187. log("Error uploading results.json")
  188. def load(self):
  189. '''
  190. Load the results.json file
  191. '''
  192. try:
  193. with open(self.file) as f:
  194. self.__dict__.update(json.load(f))
  195. except (ValueError, IOError):
  196. pass
  197. def __make_dir_for_file(self, test_name: str, test_type: str, file_name: str):
  198. path = os.path.join(self.directory, test_name, test_type, file_name)
  199. try:
  200. os.makedirs(os.path.dirname(path), exist_ok=True)
  201. except OSError:
  202. pass
  203. return path
  204. def get_docker_stats_file(self, test_name, test_type):
  205. '''
  206. Returns the stats file name for this test_name and
  207. Example: fw_root/results/timestamp/test_type/test_name/stats.txt
  208. '''
  209. return self.__make_dir_for_file(test_name, test_type, "docker_stats.json")
  210. def get_raw_file(self, test_name, test_type):
  211. '''
  212. Returns the output file for this test_name and test_type
  213. Example: fw_root/results/timestamp/test_type/test_name/raw.txt
  214. '''
  215. return self.__make_dir_for_file(test_name, test_type, "raw.txt")
  216. def get_stats_file(self, test_name, test_type):
  217. '''
  218. Returns the stats file name for this test_name and
  219. Example: fw_root/results/timestamp/test_type/test_name/stats.txt
  220. '''
  221. return self.__make_dir_for_file(test_name, test_type, "stats.txt")
  222. def report_verify_results(self, framework_test, test_type, result):
  223. '''
  224. Used by FrameworkTest to add verification details to our results
  225. TODO: Technically this is an IPC violation - we are accessing
  226. the parent process' memory from the child process
  227. '''
  228. if framework_test.name not in self.verify.keys():
  229. self.verify[framework_test.name] = dict()
  230. self.verify[framework_test.name][test_type] = result
  231. def report_benchmark_results(self, framework_test, test_type, results):
  232. '''
  233. Used by FrameworkTest to add benchmark data to this
  234. TODO: Technically this is an IPC violation - we are accessing
  235. the parent process' memory from the child process
  236. '''
  237. if test_type not in self.rawData.keys():
  238. self.rawData[test_type] = dict()
  239. # If results has a size from the parse, then it succeeded.
  240. if results:
  241. self.rawData[test_type][framework_test.name] = results
  242. # This may already be set for single-tests
  243. if framework_test.name not in self.succeeded[test_type]:
  244. self.succeeded[test_type].append(framework_test.name)
  245. else:
  246. # This may already be set for single-tests
  247. if framework_test.name not in self.failed[test_type]:
  248. self.failed[test_type].append(framework_test.name)
  249. def finish(self):
  250. '''
  251. Finishes these results.
  252. '''
  253. if not self.config.parse:
  254. # Normally you don't have to use Fore.BLUE before each line, but
  255. # Travis-CI seems to reset color codes on newline (see travis-ci/travis-ci#2692)
  256. # or stream flush, so we have to ensure that the color code is printed repeatedly
  257. log("Verification Summary",
  258. border='=',
  259. border_bottom='-',
  260. color=Fore.CYAN)
  261. for test in self.benchmarker.tests:
  262. log(Fore.CYAN + "| {!s}".format(test.name))
  263. if test.name in self.verify.keys():
  264. for test_type, result in self.verify[
  265. test.name].items():
  266. if result.upper() == "PASS":
  267. color = Fore.GREEN
  268. elif result.upper() == "WARN":
  269. color = Fore.YELLOW
  270. else:
  271. color = Fore.RED
  272. log(Fore.CYAN + "| " + test_type.ljust(13) +
  273. ' : ' + color + result.upper())
  274. else:
  275. log(Fore.CYAN + "| " + Fore.RED +
  276. "NO RESULTS (Did framework launch?)")
  277. log('', border='=', border_bottom='', color=Fore.CYAN)
  278. log("Results are saved in " + self.directory)
  279. #############################################################################
  280. # PRIVATE FUNCTIONS
  281. #############################################################################
  282. def __to_jsonable(self):
  283. '''
  284. Returns a dict suitable for jsonification
  285. '''
  286. toRet = dict()
  287. toRet['uuid'] = self.uuid
  288. toRet['name'] = self.name
  289. toRet['environmentDescription'] = self.environmentDescription
  290. toRet['git'] = self.git
  291. toRet['startTime'] = self.startTime
  292. toRet['completionTime'] = self.completionTime
  293. toRet['concurrencyLevels'] = self.concurrencyLevels
  294. toRet['pipelineConcurrencyLevels'] = self.pipelineConcurrencyLevels
  295. toRet['queryIntervals'] = self.queryIntervals
  296. toRet['cachedQueryIntervals'] = self.cachedQueryIntervals
  297. toRet['frameworks'] = self.frameworks
  298. toRet['duration'] = self.duration
  299. toRet['rawData'] = self.rawData
  300. toRet['completed'] = self.completed
  301. toRet['succeeded'] = self.succeeded
  302. toRet['failed'] = self.failed
  303. toRet['verify'] = self.verify
  304. toRet['testMetadata'] = self.benchmarker.metadata.to_jsonable()
  305. return toRet
  306. def __write_results(self):
  307. try:
  308. with open(self.file, 'w') as f:
  309. f.write(json.dumps(self.__to_jsonable(), indent=2, cls=ByteEncoder))
  310. except IOError:
  311. log("Error writing results.json")
  312. def __count_sloc(self):
  313. '''
  314. Counts the significant lines of code for all tests and stores in results.
  315. '''
  316. frameworks = self.benchmarker.metadata.gather_frameworks(
  317. self.config.test, self.config.exclude)
  318. framework_to_count = {}
  319. for framework, testlist in frameworks.items():
  320. wd = testlist[0].directory
  321. # Find the last instance of the word 'code' in the yaml output. This
  322. # should be the line count for the sum of all listed files or just
  323. # the line count for the last file in the case where there's only
  324. # one file listed.
  325. command = "cloc --yaml --follow-links . | grep code | tail -1 | cut -d: -f 2"
  326. log("Running \"%s\" (cwd=%s)" % (command, wd))
  327. try:
  328. line_count = int(subprocess.check_output(command, cwd=wd, shell=True))
  329. except (subprocess.CalledProcessError, ValueError) as e:
  330. log("Unable to count lines of code for %s due to error '%s'" %
  331. (framework, e))
  332. continue
  333. log("Counted %s lines of code" % line_count)
  334. framework_to_count[framework] = line_count
  335. self.rawData['slocCounts'] = framework_to_count
  336. def __count_commits(self):
  337. '''
  338. Count the git commits for all the framework tests
  339. '''
  340. frameworks = self.benchmarker.metadata.gather_frameworks(
  341. self.config.test, self.config.exclude)
  342. def count_commit(directory, jsonResult):
  343. command = "git rev-list HEAD -- " + directory + " | sort -u | wc -l"
  344. try:
  345. commitCount = subprocess.check_output(command, shell=True)
  346. jsonResult[framework] = int(commitCount)
  347. except subprocess.CalledProcessError:
  348. pass
  349. # Because git can be slow when run in large batches, this
  350. # calls git up to 4 times in parallel. Normal improvement is ~3-4x
  351. # in my trials, or ~100 seconds down to ~25
  352. # This is safe to parallelize as long as each thread only
  353. # accesses one key in the dictionary
  354. threads = []
  355. jsonResult = {}
  356. # t1 = datetime.now()
  357. for framework, testlist in frameworks.items():
  358. directory = testlist[0].directory
  359. t = threading.Thread(
  360. target=count_commit, args=(directory, jsonResult))
  361. t.start()
  362. threads.append(t)
  363. # Git has internal locks, full parallel will just cause contention
  364. # and slowness, so we rate-limit a bit
  365. if len(threads) >= 4:
  366. threads[0].join()
  367. threads.remove(threads[0])
  368. # Wait for remaining threads
  369. for t in threads:
  370. t.join()
  371. # t2 = datetime.now()
  372. # print "Took %s seconds " % (t2 - t1).seconds
  373. self.rawData['commitCounts'] = jsonResult
  374. self.config.commits = jsonResult
  375. def __get_git_commit_id(self):
  376. '''
  377. Get the git commit id for this benchmark
  378. '''
  379. return subprocess.check_output(
  380. ["git", "rev-parse", "HEAD"], cwd=self.config.fw_root).strip()
  381. def __get_git_repository_url(self):
  382. '''
  383. Gets the git repository url for this benchmark
  384. '''
  385. return subprocess.check_output(
  386. ["git", "config", "--get", "remote.origin.url"],
  387. cwd=self.config.fw_root).strip()
  388. def __get_git_branch_name(self):
  389. '''
  390. Gets the git branch name for this benchmark
  391. '''
  392. return subprocess.check_output(
  393. 'git rev-parse --abbrev-ref HEAD',
  394. shell=True,
  395. cwd=self.config.fw_root).strip()
  396. def __parse_stats(self, framework_test, test_type, start_time, end_time,
  397. interval):
  398. '''
  399. For each test type, process all the statistics, and return a multi-layered
  400. dictionary that has a structure as follows:
  401. (timestamp)
  402. | (main header) - group that the stat is in
  403. | | (sub header) - title of the stat
  404. | | | (stat) - the stat itself, usually a floating point number
  405. '''
  406. stats_dict = dict()
  407. stats_file = self.get_stats_file(framework_test.name, test_type)
  408. with open(stats_file) as stats:
  409. # dool doesn't output a completely compliant CSV file - we need to strip the header
  410. for _ in range(4):
  411. next(stats)
  412. stats_reader = csv.reader(stats)
  413. main_header = next(stats_reader)
  414. sub_header = next(stats_reader)
  415. time_row = sub_header.index("epoch")
  416. int_counter = 0
  417. for row in stats_reader:
  418. time = float(row[time_row])
  419. int_counter += 1
  420. if time < start_time:
  421. continue
  422. elif time > end_time:
  423. return stats_dict
  424. if int_counter % interval != 0:
  425. continue
  426. row_dict = dict()
  427. for nextheader in main_header:
  428. if nextheader != "":
  429. row_dict[nextheader] = dict()
  430. header = ""
  431. for item_num, column in enumerate(row):
  432. if len(main_header[item_num]) != 0:
  433. header = main_header[item_num]
  434. # all the stats are numbers, so we want to make sure that they stay that way in json
  435. row_dict[header][sub_header[item_num]] = float(column)
  436. stats_dict[time] = row_dict
  437. return stats_dict
  438. def __calculate_average_stats(self, raw_stats):
  439. '''
  440. We have a large amount of raw data for the statistics that may be useful
  441. for the stats nerds, but most people care about a couple of numbers. For
  442. now, we're only going to supply:
  443. * Average CPU
  444. * Average Memory
  445. * Total network use
  446. * Total disk use
  447. More may be added in the future. If they are, please update the above list.
  448. Note: raw_stats is directly from the __parse_stats method.
  449. Recall that this consists of a dictionary of timestamps, each of which
  450. contain a dictionary of stat categories which contain a dictionary of stats
  451. '''
  452. raw_stat_collection = dict()
  453. for time_dict in raw_stats.items()[1]:
  454. for main_header, sub_headers in time_dict.items():
  455. item_to_append = None
  456. if 'cpu' in main_header:
  457. # We want to take the idl stat and subtract it from 100
  458. # to get the time that the CPU is NOT idle.
  459. item_to_append = sub_headers['idl'] - 100.0
  460. elif main_header == 'memory usage':
  461. item_to_append = sub_headers['used']
  462. elif 'net' in main_header:
  463. # Network stats have two parts - recieve and send. We'll use a tuple of
  464. # style (recieve, send)
  465. item_to_append = (sub_headers['recv'], sub_headers['send'])
  466. elif 'dsk' or 'io' in main_header:
  467. # Similar for network, except our tuple looks like (read, write)
  468. item_to_append = (sub_headers['read'], sub_headers['writ'])
  469. if item_to_append is not None:
  470. if main_header not in raw_stat_collection:
  471. raw_stat_collection[main_header] = list()
  472. raw_stat_collection[main_header].append(item_to_append)
  473. # Simple function to determine human readable size
  474. # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
  475. def sizeof_fmt(num):
  476. # We'll assume that any number we get is convertable to a float, just in case
  477. num = float(num)
  478. for x in ['bytes', 'KB', 'MB', 'GB']:
  479. if 1024.0 > num > -1024.0:
  480. return "%3.1f%s" % (num, x)
  481. num /= 1024.0
  482. return "%3.1f%s" % (num, 'TB')
  483. # Now we have our raw stats in a readable format - we need to format it for display
  484. # We need a floating point sum, so the built in sum doesn't cut it
  485. display_stat_collection = dict()
  486. for header, values in raw_stat_collection.items():
  487. display_stat = None
  488. if 'cpu' in header:
  489. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  490. elif main_header == 'memory usage':
  491. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  492. elif 'net' in main_header:
  493. receive, send = zip(*values) # unzip
  494. display_stat = {
  495. 'receive': sizeof_fmt(math.fsum(receive)),
  496. 'send': sizeof_fmt(math.fsum(send))
  497. }
  498. else: # if 'dsk' or 'io' in header:
  499. read, write = zip(*values) # unzip
  500. display_stat = {
  501. 'read': sizeof_fmt(math.fsum(read)),
  502. 'write': sizeof_fmt(math.fsum(write))
  503. }
  504. display_stat_collection[header] = display_stat
  505. return display_stat