results_helper.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. from toolset.utils.metadata_helper import gather_remaining_tests, gather_frameworks
  2. from toolset.utils.output_helper import log, FNULL
  3. import os
  4. import subprocess
  5. import uuid
  6. import time
  7. import json
  8. import requests
  9. import threading
  10. import re
  11. import math
  12. import csv
  13. from datetime import datetime
  14. # Cross-platform colored text
  15. from colorama import Fore, Style
  16. class Results:
  17. def __init__(self, config):
  18. '''
  19. Constructor
  20. '''
  21. self.config = config
  22. self.directory = os.path.join(self.config.fwroot, "results",
  23. self.config.timestamp)
  24. try:
  25. os.makedirs(self.directory)
  26. except OSError:
  27. pass
  28. self.file = os.path.join(self.directory, "results.json")
  29. self.uuid = str(uuid.uuid4())
  30. self.name = datetime.now().strftime(self.config.results_name)
  31. self.environmentDescription = self.config.results_environment
  32. try:
  33. self.git = dict()
  34. self.git['commitId'] = self.__get_git_commit_id()
  35. self.git['repositoryUrl'] = self.__get_git_repository_url()
  36. self.git['branchName'] = self.__get_git_branch_name()
  37. except Exception:
  38. #Could not read local git repository, which is fine.
  39. self.git = None
  40. self.startTime = int(round(time.time() * 1000))
  41. self.completionTime = None
  42. self.concurrencyLevels = self.config.concurrency_levels
  43. self.pipelineConcurrencyLevels = self.config.pipeline_concurrency_levels
  44. self.queryIntervals = self.config.query_levels
  45. self.cachedQueryIntervals = self.config.cached_query_levels
  46. self.frameworks = [
  47. t.name for t in gather_remaining_tests(self.config, self)
  48. ]
  49. self.duration = self.config.duration
  50. self.rawData = dict()
  51. self.rawData['json'] = dict()
  52. self.rawData['db'] = dict()
  53. self.rawData['query'] = dict()
  54. self.rawData['fortune'] = dict()
  55. self.rawData['update'] = dict()
  56. self.rawData['plaintext'] = dict()
  57. self.rawData['cached_query'] = dict()
  58. self.completed = dict()
  59. self.succeeded = dict()
  60. self.succeeded['json'] = []
  61. self.succeeded['db'] = []
  62. self.succeeded['query'] = []
  63. self.succeeded['fortune'] = []
  64. self.succeeded['update'] = []
  65. self.succeeded['plaintext'] = []
  66. self.succeeded['cached_query'] = []
  67. self.failed = dict()
  68. self.failed['json'] = []
  69. self.failed['db'] = []
  70. self.failed['query'] = []
  71. self.failed['fortune'] = []
  72. self.failed['update'] = []
  73. self.failed['plaintext'] = []
  74. self.failed['cached_query'] = []
  75. self.verify = dict()
  76. #############################################################################
  77. # PUBLIC FUNCTIONS
  78. #############################################################################
  79. def parse(self, tests):
  80. '''
  81. Ensures that the system has all necessary software to run
  82. the tests. This does not include that software for the individual
  83. test, but covers software such as curl and weighttp that
  84. are needed.
  85. '''
  86. # Run the method to get the commmit count of each framework.
  87. self.__count_commits()
  88. # Call the method which counts the sloc for each framework
  89. self.__count_sloc()
  90. # Time to create parsed files
  91. # Aggregate JSON file
  92. with open(self.file, "w") as f:
  93. f.write(json.dumps(self.__to_jsonable(), indent=2))
  94. def parse_test(self, framework_test, test_type):
  95. '''
  96. Parses the given test and test_type from the raw_file.
  97. '''
  98. try:
  99. results = dict()
  100. results['results'] = []
  101. stats = []
  102. if os.path.exists(
  103. self.get_raw_file(framework_test.name, test_type)):
  104. with open(self.get_raw_file(framework_test.name,
  105. test_type)) as raw_data:
  106. is_warmup = True
  107. rawData = None
  108. for line in raw_data:
  109. if "Queries:" in line or "Concurrency:" in line:
  110. is_warmup = False
  111. rawData = None
  112. continue
  113. if "Warmup" in line or "Primer" in line:
  114. is_warmup = True
  115. continue
  116. if not is_warmup:
  117. if rawData == None:
  118. rawData = dict()
  119. results['results'].append(rawData)
  120. if "Latency" in line:
  121. m = re.findall(
  122. r"([0-9]+\.*[0-9]*[us|ms|s|m|%]+)", line)
  123. if len(m) == 4:
  124. rawData['latencyAvg'] = m[0]
  125. rawData['latencyStdev'] = m[1]
  126. rawData['latencyMax'] = m[2]
  127. if "requests in" in line:
  128. m = re.search("([0-9]+) requests in", line)
  129. if m != None:
  130. rawData['totalRequests'] = int(m.group(1))
  131. if "Socket errors" in line:
  132. if "connect" in line:
  133. m = re.search("connect ([0-9]+)", line)
  134. rawData['connect'] = int(m.group(1))
  135. if "read" in line:
  136. m = re.search("read ([0-9]+)", line)
  137. rawData['read'] = int(m.group(1))
  138. if "write" in line:
  139. m = re.search("write ([0-9]+)", line)
  140. rawData['write'] = int(m.group(1))
  141. if "timeout" in line:
  142. m = re.search("timeout ([0-9]+)", line)
  143. rawData['timeout'] = int(m.group(1))
  144. if "Non-2xx" in line:
  145. m = re.search(
  146. "Non-2xx or 3xx responses: ([0-9]+)", line)
  147. if m != None:
  148. rawData['5xx'] = int(m.group(1))
  149. if "STARTTIME" in line:
  150. m = re.search("[0-9]+", line)
  151. rawData["startTime"] = int(m.group(0))
  152. if "ENDTIME" in line:
  153. m = re.search("[0-9]+", line)
  154. rawData["endTime"] = int(m.group(0))
  155. test_stats = self.__parse_stats(
  156. framework_test, test_type,
  157. rawData["startTime"], rawData["endTime"],
  158. 1)
  159. stats.append(test_stats)
  160. with open(
  161. self.get_stats_file(framework_test.name, test_type) +
  162. ".json", "w") as stats_file:
  163. json.dump(stats, stats_file, indent=2)
  164. return results
  165. except IOError:
  166. return None
  167. def parse_all(self, framework_test):
  168. '''
  169. Method meant to be run for a given timestamp
  170. '''
  171. for test_type in framework_test.runTests:
  172. if os.path.exists(
  173. self.get_raw_file(framework_test.name, test_type)):
  174. results = self.parse_test(framework_test, test_type)
  175. self.report_benchmark_results(framework_test, test_type,
  176. results['results'])
  177. def write_intermediate(self, test_name, status_message):
  178. '''
  179. Writes the intermediate results for the given test_name and status_message
  180. '''
  181. self.completed[test_name] = status_message
  182. self.__write_results()
  183. def set_completion_time(self):
  184. '''
  185. Sets the completionTime for these results and writes the results
  186. '''
  187. self.completionTime = int(round(time.time() * 1000))
  188. self.__write_results()
  189. def upload(self):
  190. '''
  191. Attempts to upload the results.json to the configured results_upload_uri
  192. '''
  193. if self.config.results_upload_uri != None:
  194. try:
  195. requests.post(
  196. self.config.results_upload_uri,
  197. headers={'Content-Type': 'application/json'},
  198. data=json.dumps(self.__to_jsonable(), indent=2))
  199. except (Exception):
  200. log("Error uploading results.json")
  201. def load(self):
  202. '''
  203. Load the results.json file
  204. '''
  205. try:
  206. with open(self.file) as f:
  207. self.__dict__.update(json.load(f))
  208. except (ValueError, IOError):
  209. pass
  210. def get_raw_file(self, test_name, test_type):
  211. '''
  212. Returns the output file for this test_name and test_type
  213. Example: fwroot/results/timestamp/test_type/test_name/raw.txt
  214. '''
  215. path = os.path.join(self.directory, test_name, test_type, "raw.txt")
  216. try:
  217. os.makedirs(os.path.dirname(path))
  218. except OSError:
  219. pass
  220. return path
  221. def get_stats_file(self, test_name, test_type):
  222. '''
  223. Returns the stats file name for this test_name and
  224. Example: fwroot/results/timestamp/test_type/test_name/stats.txt
  225. '''
  226. path = os.path.join(self.directory, test_name, test_type, "stats.txt")
  227. try:
  228. os.makedirs(os.path.dirname(path))
  229. except OSError:
  230. pass
  231. return path
  232. def report_verify_results(self, framework_test, test_type, result):
  233. '''
  234. Used by FrameworkTest to add verification details to our results
  235. TODO: Technically this is an IPC violation - we are accessing
  236. the parent process' memory from the child process
  237. '''
  238. if framework_test.name not in self.verify.keys():
  239. self.verify[framework_test.name] = dict()
  240. self.verify[framework_test.name][test_type] = result
  241. def report_benchmark_results(self, framework_test, test_type, results):
  242. '''
  243. Used by FrameworkTest to add benchmark data to this
  244. TODO: Technically this is an IPC violation - we are accessing
  245. the parent process' memory from the child process
  246. '''
  247. if test_type not in self.rawData.keys():
  248. self.rawData[test_type] = dict()
  249. # If results has a size from the parse, then it succeeded.
  250. if results:
  251. self.rawData[test_type][framework_test.name] = results
  252. # This may already be set for single-tests
  253. if framework_test.name not in self.succeeded[test_type]:
  254. self.succeeded[test_type].append(framework_test.name)
  255. else:
  256. # This may already be set for single-tests
  257. if framework_test.name not in self.failed[test_type]:
  258. self.failed[test_type].append(framework_test.name)
  259. def finish(self):
  260. '''
  261. Finishes these results.
  262. '''
  263. if not self.config.parse:
  264. tests = gather_remaining_tests(self.config, self)
  265. # Normally you don't have to use Fore.BLUE before each line, but
  266. # Travis-CI seems to reset color codes on newline (see travis-ci/travis-ci#2692)
  267. # or stream flush, so we have to ensure that the color code is printed repeatedly
  268. log(
  269. "Verification Summary", border='=', border_bottom='-', color=Fore.CYAN)
  270. for test in tests:
  271. log(Fore.CYAN + "| {!s}".format(test.name))
  272. if test.name in self.verify.keys():
  273. for test_type, result in self.verify[
  274. test.name].iteritems():
  275. if result.upper() == "PASS":
  276. color = Fore.GREEN
  277. elif result.upper() == "WARN":
  278. color = Fore.YELLOW
  279. else:
  280. color = Fore.RED
  281. log(Fore.CYAN + "| " + test_type.ljust(13) +
  282. ' : ' + color + result.upper())
  283. else:
  284. log(Fore.CYAN + "| " + Fore.RED +
  285. "NO RESULTS (Did framework launch?)")
  286. log('', border='=', border_bottom='', color=Fore.CYAN)
  287. log("%sTime to complete: %s seconds" %
  288. (Style.RESET_ALL, str(int(time.time() - self.config.start_time))))
  289. log("Results are saved in " + self.directory)
  290. #############################################################################
  291. # PRIVATE FUNCTIONS
  292. #############################################################################
  293. def __to_jsonable(self):
  294. '''
  295. Returns a dict suitable for jsonification
  296. '''
  297. toRet = dict()
  298. toRet['uuid'] = self.uuid
  299. toRet['name'] = self.name
  300. toRet['environmentDescription'] = self.environmentDescription
  301. toRet['git'] = self.git
  302. toRet['startTime'] = self.startTime
  303. toRet['completionTime'] = self.completionTime
  304. toRet['concurrencyLevels'] = self.concurrencyLevels
  305. toRet['pipelineConcurrencyLevels'] = self.pipelineConcurrencyLevels
  306. toRet['queryIntervals'] = self.queryIntervals
  307. toRet['cachedQueryIntervals'] = self.cachedQueryIntervals
  308. toRet['frameworks'] = self.frameworks
  309. toRet['duration'] = self.duration
  310. toRet['rawData'] = self.rawData
  311. toRet['completed'] = self.completed
  312. toRet['succeeded'] = self.succeeded
  313. toRet['failed'] = self.failed
  314. toRet['verify'] = self.verify
  315. return toRet
  316. def __write_results(self):
  317. try:
  318. with open(self.file, 'w') as f:
  319. f.write(json.dumps(self.__to_jsonable(), indent=2))
  320. except (IOError):
  321. log("Error writing results.json")
  322. def __count_sloc(self):
  323. '''
  324. Counts the significant lines of code for all tests and stores in results.
  325. '''
  326. frameworks = gather_frameworks(self.config.test, self.config.exclude,
  327. self.config)
  328. jsonResult = {}
  329. for framework, testlist in frameworks.items():
  330. if not os.path.exists(
  331. os.path.join(testlist[0].directory, "source_code")):
  332. log("Cannot count lines of code for %s - no 'source_code' file"
  333. % framework)
  334. continue
  335. # Unfortunately the source_code files use lines like
  336. # ./cpoll_cppsp/www/fortune_old instead of
  337. # ./www/fortune_old
  338. # so we have to back our working dir up one level
  339. wd = os.path.dirname(testlist[0].directory)
  340. try:
  341. command = "cloc --list-file=%s/source_code --yaml" % testlist[
  342. 0].directory
  343. if os.path.exists(
  344. os.path.join(testlist[0].directory, "cloc_defs.txt")):
  345. command += " --read-lang-def %s" % os.path.join(
  346. testlist[0].directory, "cloc_defs.txt")
  347. log("Using custom cloc definitions for %s" % framework)
  348. # Find the last instance of the word 'code' in the yaml output. This should
  349. # be the line count for the sum of all listed files or just the line count
  350. # for the last file in the case where there's only one file listed.
  351. command = command + "| grep code | tail -1 | cut -d: -f 2"
  352. log("Running \"%s\" (cwd=%s)" % (command, wd))
  353. lineCount = subprocess.check_output(
  354. command, cwd=wd, shell=True)
  355. jsonResult[framework] = int(lineCount)
  356. except subprocess.CalledProcessError:
  357. continue
  358. except ValueError as ve:
  359. log("Unable to get linecount for %s due to error '%s'" %
  360. (framework, ve))
  361. self.rawData['slocCounts'] = jsonResult
  362. def __count_commits(self):
  363. '''
  364. Count the git commits for all the framework tests
  365. '''
  366. frameworks = gather_frameworks(self.config.test, self.config.exclude,
  367. self.config)
  368. def count_commit(directory, jsonResult):
  369. command = "git rev-list HEAD -- " + directory + " | sort -u | wc -l"
  370. try:
  371. commitCount = subprocess.check_output(command, shell=True)
  372. jsonResult[framework] = int(commitCount)
  373. except subprocess.CalledProcessError:
  374. pass
  375. # Because git can be slow when run in large batches, this
  376. # calls git up to 4 times in parallel. Normal improvement is ~3-4x
  377. # in my trials, or ~100 seconds down to ~25
  378. # This is safe to parallelize as long as each thread only
  379. # accesses one key in the dictionary
  380. threads = []
  381. jsonResult = {}
  382. # t1 = datetime.now()
  383. for framework, testlist in frameworks.items():
  384. directory = testlist[0].directory
  385. t = threading.Thread(
  386. target=count_commit, args=(directory, jsonResult))
  387. t.start()
  388. threads.append(t)
  389. # Git has internal locks, full parallel will just cause contention
  390. # and slowness, so we rate-limit a bit
  391. if len(threads) >= 4:
  392. threads[0].join()
  393. threads.remove(threads[0])
  394. # Wait for remaining threads
  395. for t in threads:
  396. t.join()
  397. # t2 = datetime.now()
  398. # print "Took %s seconds " % (t2 - t1).seconds
  399. self.rawData['commitCounts'] = jsonResult
  400. self.config.commits = jsonResult
  401. def __get_git_commit_id(self):
  402. '''
  403. Get the git commit id for this benchmark
  404. '''
  405. return subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=self.config.fwroot).strip()
  406. def __get_git_repository_url(self):
  407. '''
  408. Gets the git repository url for this benchmark
  409. '''
  410. return subprocess.check_output(
  411. ["git", "config", "--get", "remote.origin.url"], cwd=self.config.fwroot).strip()
  412. def __get_git_branch_name(self):
  413. '''
  414. Gets the git branch name for this benchmark
  415. '''
  416. return subprocess.check_output(
  417. 'git rev-parse --abbrev-ref HEAD', shell=True, cwd=self.config.fwroot).strip()
  418. def __parse_stats(self, framework_test, test_type, start_time, end_time,
  419. interval):
  420. '''
  421. For each test type, process all the statistics, and return a multi-layered
  422. dictionary that has a structure as follows:
  423. (timestamp)
  424. | (main header) - group that the stat is in
  425. | | (sub header) - title of the stat
  426. | | | (stat) - the stat itself, usually a floating point number
  427. '''
  428. stats_dict = dict()
  429. stats_file = self.get_stats_file(framework_test.name, test_type)
  430. with open(stats_file) as stats:
  431. # dstat doesn't output a completely compliant CSV file - we need to strip the header
  432. while (stats.next() != "\n"):
  433. pass
  434. stats_reader = csv.reader(stats)
  435. main_header = stats_reader.next()
  436. sub_header = stats_reader.next()
  437. time_row = sub_header.index("epoch")
  438. int_counter = 0
  439. for row in stats_reader:
  440. time = float(row[time_row])
  441. int_counter += 1
  442. if time < start_time:
  443. continue
  444. elif time > end_time:
  445. return stats_dict
  446. if int_counter % interval != 0:
  447. continue
  448. row_dict = dict()
  449. for nextheader in main_header:
  450. if nextheader != "":
  451. row_dict[nextheader] = dict()
  452. header = ""
  453. for item_num, column in enumerate(row):
  454. if (len(main_header[item_num]) != 0):
  455. header = main_header[item_num]
  456. # all the stats are numbers, so we want to make sure that they stay that way in json
  457. row_dict[header][sub_header[item_num]] = float(column)
  458. stats_dict[time] = row_dict
  459. return stats_dict
  460. def __calculate_average_stats(self, raw_stats):
  461. '''
  462. We have a large amount of raw data for the statistics that may be useful
  463. for the stats nerds, but most people care about a couple of numbers. For
  464. now, we're only going to supply:
  465. * Average CPU
  466. * Average Memory
  467. * Total network use
  468. * Total disk use
  469. More may be added in the future. If they are, please update the above list.
  470. Note: raw_stats is directly from the __parse_stats method.
  471. Recall that this consists of a dictionary of timestamps, each of which
  472. contain a dictionary of stat categories which contain a dictionary of stats
  473. '''
  474. raw_stat_collection = dict()
  475. for time_dict in raw_stats.items()[1]:
  476. for main_header, sub_headers in time_dict.items():
  477. item_to_append = None
  478. if 'cpu' in main_header:
  479. # We want to take the idl stat and subtract it from 100
  480. # to get the time that the CPU is NOT idle.
  481. item_to_append = sub_headers['idl'] - 100.0
  482. elif main_header == 'memory usage':
  483. item_to_append = sub_headers['used']
  484. elif 'net' in main_header:
  485. # Network stats have two parts - recieve and send. We'll use a tuple of
  486. # style (recieve, send)
  487. item_to_append = (sub_headers['recv'], sub_headers['send'])
  488. elif 'dsk' or 'io' in main_header:
  489. # Similar for network, except our tuple looks like (read, write)
  490. item_to_append = (sub_headers['read'], sub_headers['writ'])
  491. if item_to_append is not None:
  492. if main_header not in raw_stat_collection:
  493. raw_stat_collection[main_header] = list()
  494. raw_stat_collection[main_header].append(item_to_append)
  495. # Simple function to determine human readable size
  496. # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
  497. def sizeof_fmt(num):
  498. # We'll assume that any number we get is convertable to a float, just in case
  499. num = float(num)
  500. for x in ['bytes', 'KB', 'MB', 'GB']:
  501. if num < 1024.0 and num > -1024.0:
  502. return "%3.1f%s" % (num, x)
  503. num /= 1024.0
  504. return "%3.1f%s" % (num, 'TB')
  505. # Now we have our raw stats in a readable format - we need to format it for display
  506. # We need a floating point sum, so the built in sum doesn't cut it
  507. display_stat_collection = dict()
  508. for header, values in raw_stat_collection.items():
  509. display_stat = None
  510. if 'cpu' in header:
  511. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  512. elif main_header == 'memory usage':
  513. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  514. elif 'net' in main_header:
  515. receive, send = zip(*values) # unzip
  516. display_stat = {
  517. 'receive': sizeof_fmt(math.fsum(receive)),
  518. 'send': sizeof_fmt(math.fsum(send))
  519. }
  520. else: # if 'dsk' or 'io' in header:
  521. read, write = zip(*values) # unzip
  522. display_stat = {
  523. 'read': sizeof_fmt(math.fsum(read)),
  524. 'write': sizeof_fmt(math.fsum(write))
  525. }
  526. display_stat_collection[header] = display_stat
  527. return display_stat