results.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. import os
  2. import subprocess
  3. import uuid
  4. import time
  5. import json
  6. import requests
  7. import threading
  8. import re
  9. import math
  10. import csv
  11. from datetime import datetime
  12. # Cross-platform colored text
  13. from colorama import Fore, Style
  14. class Results:
  15. def __init__(self, benchmarker):
  16. '''
  17. Constructor
  18. '''
  19. self.benchmarker = benchmarker
  20. self.log = benchmarker.log
  21. self.config = benchmarker.config
  22. self.directory = os.path.join(self.config.results_root,
  23. self.config.timestamp)
  24. try:
  25. os.makedirs(self.directory)
  26. except OSError:
  27. pass
  28. self.file = os.path.join(self.directory, "results.json")
  29. self.uuid = str(uuid.uuid4())
  30. self.name = datetime.now().strftime(self.config.results_name)
  31. self.environmentDescription = self.config.results_environment
  32. try:
  33. self.git = dict()
  34. self.git['commitId'] = self.__get_git_commit_id()
  35. self.git['repositoryUrl'] = self.__get_git_repository_url()
  36. self.git['branchName'] = self.__get_git_branch_name()
  37. except Exception:
  38. #Could not read local git repository, which is fine.
  39. self.git = None
  40. self.startTime = int(round(time.time() * 1000))
  41. self.completionTime = None
  42. self.concurrencyLevels = self.config.concurrency_levels
  43. self.pipelineConcurrencyLevels = self.config.pipeline_concurrency_levels
  44. self.queryIntervals = self.config.query_levels
  45. self.cachedQueryIntervals = self.config.cached_query_levels
  46. self.frameworks = [t.name for t in benchmarker.tests]
  47. self.duration = self.config.duration
  48. self.rawData = dict()
  49. self.rawData['json'] = dict()
  50. self.rawData['db'] = dict()
  51. self.rawData['query'] = dict()
  52. self.rawData['fortune'] = dict()
  53. self.rawData['update'] = dict()
  54. self.rawData['plaintext'] = dict()
  55. self.rawData['cached_query'] = dict()
  56. self.completed = dict()
  57. self.succeeded = dict()
  58. self.succeeded['json'] = []
  59. self.succeeded['db'] = []
  60. self.succeeded['query'] = []
  61. self.succeeded['fortune'] = []
  62. self.succeeded['update'] = []
  63. self.succeeded['plaintext'] = []
  64. self.succeeded['cached_query'] = []
  65. self.failed = dict()
  66. self.failed['json'] = []
  67. self.failed['db'] = []
  68. self.failed['query'] = []
  69. self.failed['fortune'] = []
  70. self.failed['update'] = []
  71. self.failed['plaintext'] = []
  72. self.failed['cached_query'] = []
  73. self.verify = dict()
  74. #############################################################################
  75. # PUBLIC FUNCTIONS
  76. #############################################################################
  77. def parse(self, tests):
  78. '''
  79. Ensures that the system has all necessary software to run
  80. the tests. This does not include that software for the individual
  81. test, but covers software such as curl and weighttp that
  82. are needed.
  83. '''
  84. # Run the method to get the commmit count of each framework.
  85. self.__count_commits()
  86. # Call the method which counts the sloc for each framework
  87. self.__count_sloc()
  88. # Time to create parsed files
  89. # Aggregate JSON file
  90. with open(self.file, "w") as f:
  91. f.write(json.dumps(self.__to_jsonable(), indent=2))
  92. def parse_test(self, framework_test, test_type):
  93. '''
  94. Parses the given test and test_type from the raw_file.
  95. '''
  96. results = dict()
  97. results['results'] = []
  98. stats = []
  99. if os.path.exists(self.get_raw_file(framework_test.name, test_type)):
  100. with open(self.get_raw_file(framework_test.name,
  101. test_type)) as raw_data:
  102. is_warmup = True
  103. rawData = None
  104. for line in raw_data:
  105. if "Queries:" in line or "Concurrency:" in line:
  106. is_warmup = False
  107. rawData = None
  108. continue
  109. if "Warmup" in line or "Primer" in line:
  110. is_warmup = True
  111. continue
  112. if not is_warmup:
  113. if rawData is None:
  114. rawData = dict()
  115. results['results'].append(rawData)
  116. if "Latency" in line:
  117. m = re.findall(r"([0-9]+\.*[0-9]*[us|ms|s|m|%]+)",
  118. line)
  119. if len(m) == 4:
  120. rawData['latencyAvg'] = m[0]
  121. rawData['latencyStdev'] = m[1]
  122. rawData['latencyMax'] = m[2]
  123. if "requests in" in line:
  124. m = re.search("([0-9]+) requests in", line)
  125. if m is not None:
  126. rawData['totalRequests'] = int(m.group(1))
  127. if "Socket errors" in line:
  128. if "connect" in line:
  129. m = re.search("connect ([0-9]+)", line)
  130. rawData['connect'] = int(m.group(1))
  131. if "read" in line:
  132. m = re.search("read ([0-9]+)", line)
  133. rawData['read'] = int(m.group(1))
  134. if "write" in line:
  135. m = re.search("write ([0-9]+)", line)
  136. rawData['write'] = int(m.group(1))
  137. if "timeout" in line:
  138. m = re.search("timeout ([0-9]+)", line)
  139. rawData['timeout'] = int(m.group(1))
  140. if "Non-2xx" in line:
  141. m = re.search("Non-2xx or 3xx responses: ([0-9]+)",
  142. line)
  143. if m != None:
  144. rawData['5xx'] = int(m.group(1))
  145. if "STARTTIME" in line:
  146. m = re.search("[0-9]+", line)
  147. rawData["startTime"] = int(m.group(0))
  148. if "ENDTIME" in line:
  149. m = re.search("[0-9]+", line)
  150. rawData["endTime"] = int(m.group(0))
  151. test_stats = self.__parse_stats(
  152. framework_test, test_type,
  153. rawData["startTime"], rawData["endTime"], 1)
  154. stats.append(test_stats)
  155. with open(
  156. self.get_stats_file(framework_test.name, test_type) + ".json",
  157. "w") as stats_file:
  158. json.dump(stats, stats_file, indent=2)
  159. return results
  160. def parse_all(self, framework_test):
  161. '''
  162. Method meant to be run for a given timestamp
  163. '''
  164. for test_type in framework_test.runTests:
  165. if os.path.exists(
  166. self.get_raw_file(framework_test.name, test_type)):
  167. results = self.parse_test(framework_test, test_type)
  168. self.report_benchmark_results(framework_test, test_type,
  169. results['results'])
  170. def write_intermediate(self, test_name, status_message):
  171. '''
  172. Writes the intermediate results for the given test_name and status_message
  173. '''
  174. self.completed[test_name] = status_message
  175. self.__write_results()
  176. def set_completion_time(self):
  177. '''
  178. Sets the completionTime for these results and writes the results
  179. '''
  180. self.completionTime = int(round(time.time() * 1000))
  181. self.__write_results()
  182. def upload(self):
  183. '''
  184. Attempts to upload the results.json to the configured results_upload_uri
  185. '''
  186. if self.config.results_upload_uri is not None:
  187. try:
  188. requests.post(
  189. self.config.results_upload_uri,
  190. headers={'Content-Type': 'application/json'},
  191. data=json.dumps(self.__to_jsonable(), indent=2))
  192. except Exception:
  193. self.log("Error uploading results.json")
  194. def load(self):
  195. '''
  196. Load the results.json file
  197. '''
  198. try:
  199. with open(self.file) as f:
  200. self.__dict__.update(json.load(f))
  201. except (ValueError, IOError):
  202. pass
  203. def get_raw_file(self, test_name, test_type):
  204. '''
  205. Returns the output file for this test_name and test_type
  206. Example: fw_root/results/timestamp/test_type/test_name/raw.txt
  207. '''
  208. path = os.path.join(self.directory, test_name, test_type, "raw.txt")
  209. try:
  210. os.makedirs(os.path.dirname(path))
  211. except OSError:
  212. pass
  213. return path
  214. def get_stats_file(self, test_name, test_type):
  215. '''
  216. Returns the stats file name for this test_name and
  217. Example: fw_root/results/timestamp/test_type/test_name/stats.txt
  218. '''
  219. path = os.path.join(self.directory, test_name, test_type, "stats.txt")
  220. try:
  221. os.makedirs(os.path.dirname(path))
  222. except OSError:
  223. pass
  224. return path
  225. def report_verify_results(self, framework_test, test_type, result):
  226. '''
  227. Used by FrameworkTest to add verification details to our results
  228. TODO: Technically this is an IPC violation - we are accessing
  229. the parent process' memory from the child process
  230. '''
  231. if framework_test.name not in self.verify.keys():
  232. self.verify[framework_test.name] = dict()
  233. self.verify[framework_test.name][test_type] = result
  234. def report_benchmark_results(self, framework_test, test_type, results):
  235. '''
  236. Used by FrameworkTest to add benchmark data to this
  237. TODO: Technically this is an IPC violation - we are accessing
  238. the parent process' memory from the child process
  239. '''
  240. if test_type not in self.rawData.keys():
  241. self.rawData[test_type] = dict()
  242. # If results has a size from the parse, then it succeeded.
  243. if results:
  244. self.rawData[test_type][framework_test.name] = results
  245. # This may already be set for single-tests
  246. if framework_test.name not in self.succeeded[test_type]:
  247. self.succeeded[test_type].append(framework_test.name)
  248. else:
  249. # This may already be set for single-tests
  250. if framework_test.name not in self.failed[test_type]:
  251. self.failed[test_type].append(framework_test.name)
  252. def finish(self):
  253. '''
  254. Finishes these results.
  255. '''
  256. if not self.config.parse:
  257. # Normally you don't have to use Fore.BLUE before each line, but
  258. # Travis-CI seems to reset color codes on newline (see travis-ci/travis-ci#2692)
  259. # or stream flush, so we have to ensure that the color code is printed repeatedly
  260. self.log("Verification Summary",
  261. border='=',
  262. border_bottom='-',
  263. color=Fore.CYAN)
  264. for test in self.benchmarker.tests:
  265. self.log(Fore.CYAN + "| {!s}".format(test.name))
  266. if test.name in self.verify.keys():
  267. for test_type, result in self.verify[
  268. test.name].iteritems():
  269. if result.upper() == "PASS":
  270. color = Fore.GREEN
  271. elif result.upper() == "WARN":
  272. color = Fore.YELLOW
  273. else:
  274. color = Fore.RED
  275. self.log(Fore.CYAN + "| " + test_type.ljust(13) +
  276. ' : ' + color + result.upper())
  277. else:
  278. self.log(Fore.CYAN + "| " + Fore.RED +
  279. "NO RESULTS (Did framework launch?)")
  280. self.log('', border='=', border_bottom='', color=Fore.CYAN)
  281. self.log("Results are saved in " + self.directory)
  282. #############################################################################
  283. # PRIVATE FUNCTIONS
  284. #############################################################################
  285. def __to_jsonable(self):
  286. '''
  287. Returns a dict suitable for jsonification
  288. '''
  289. toRet = dict()
  290. toRet['uuid'] = self.uuid
  291. toRet['name'] = self.name
  292. toRet['environmentDescription'] = self.environmentDescription
  293. toRet['git'] = self.git
  294. toRet['startTime'] = self.startTime
  295. toRet['completionTime'] = self.completionTime
  296. toRet['concurrencyLevels'] = self.concurrencyLevels
  297. toRet['pipelineConcurrencyLevels'] = self.pipelineConcurrencyLevels
  298. toRet['queryIntervals'] = self.queryIntervals
  299. toRet['cachedQueryIntervals'] = self.cachedQueryIntervals
  300. toRet['frameworks'] = self.frameworks
  301. toRet['duration'] = self.duration
  302. toRet['rawData'] = self.rawData
  303. toRet['completed'] = self.completed
  304. toRet['succeeded'] = self.succeeded
  305. toRet['failed'] = self.failed
  306. toRet['verify'] = self.verify
  307. return toRet
  308. def __write_results(self):
  309. try:
  310. with open(self.file, 'w') as f:
  311. f.write(json.dumps(self.__to_jsonable(), indent=2))
  312. except IOError:
  313. self.log("Error writing results.json")
  314. def __count_sloc(self):
  315. '''
  316. Counts the significant lines of code for all tests and stores in results.
  317. '''
  318. frameworks = self.benchmarker.metadata.gather_frameworks(
  319. self.config.test, self.config.exclude)
  320. framework_to_count = {}
  321. for framework, testlist in frameworks.items():
  322. wd = testlist[0].directory
  323. # Find the last instance of the word 'code' in the yaml output. This
  324. # should be the line count for the sum of all listed files or just
  325. # the line count for the last file in the case where there's only
  326. # one file listed.
  327. command = "cloc --yaml --follow-links . | grep code | tail -1 | cut -d: -f 2"
  328. self.log("Running \"%s\" (cwd=%s)" % (command, wd))
  329. try:
  330. line_count = int(subprocess.check_output(command, cwd=wd, shell=True))
  331. except (subprocess.CalledProcessError, ValueError) as e:
  332. self.log("Unable to count lines of code for %s due to error '%s'" %
  333. (framework, e))
  334. continue
  335. self.log("Counted %s lines of code" % line_count)
  336. framework_to_count[framework] = line_count
  337. self.rawData['slocCounts'] = framework_to_count
  338. def __count_commits(self):
  339. '''
  340. Count the git commits for all the framework tests
  341. '''
  342. frameworks = self.benchmarker.metadata.gather_frameworks(
  343. self.config.test, self.config.exclude)
  344. def count_commit(directory, jsonResult):
  345. command = "git rev-list HEAD -- " + directory + " | sort -u | wc -l"
  346. try:
  347. commitCount = subprocess.check_output(command, shell=True)
  348. jsonResult[framework] = int(commitCount)
  349. except subprocess.CalledProcessError:
  350. pass
  351. # Because git can be slow when run in large batches, this
  352. # calls git up to 4 times in parallel. Normal improvement is ~3-4x
  353. # in my trials, or ~100 seconds down to ~25
  354. # This is safe to parallelize as long as each thread only
  355. # accesses one key in the dictionary
  356. threads = []
  357. jsonResult = {}
  358. # t1 = datetime.now()
  359. for framework, testlist in frameworks.items():
  360. directory = testlist[0].directory
  361. t = threading.Thread(
  362. target=count_commit, args=(directory, jsonResult))
  363. t.start()
  364. threads.append(t)
  365. # Git has internal locks, full parallel will just cause contention
  366. # and slowness, so we rate-limit a bit
  367. if len(threads) >= 4:
  368. threads[0].join()
  369. threads.remove(threads[0])
  370. # Wait for remaining threads
  371. for t in threads:
  372. t.join()
  373. # t2 = datetime.now()
  374. # print "Took %s seconds " % (t2 - t1).seconds
  375. self.rawData['commitCounts'] = jsonResult
  376. self.config.commits = jsonResult
  377. def __get_git_commit_id(self):
  378. '''
  379. Get the git commit id for this benchmark
  380. '''
  381. return subprocess.check_output(
  382. ["git", "rev-parse", "HEAD"], cwd=self.config.fw_root).strip()
  383. def __get_git_repository_url(self):
  384. '''
  385. Gets the git repository url for this benchmark
  386. '''
  387. return subprocess.check_output(
  388. ["git", "config", "--get", "remote.origin.url"],
  389. cwd=self.config.fw_root).strip()
  390. def __get_git_branch_name(self):
  391. '''
  392. Gets the git branch name for this benchmark
  393. '''
  394. return subprocess.check_output(
  395. 'git rev-parse --abbrev-ref HEAD',
  396. shell=True,
  397. cwd=self.config.fw_root).strip()
  398. def __parse_stats(self, framework_test, test_type, start_time, end_time,
  399. interval):
  400. '''
  401. For each test type, process all the statistics, and return a multi-layered
  402. dictionary that has a structure as follows:
  403. (timestamp)
  404. | (main header) - group that the stat is in
  405. | | (sub header) - title of the stat
  406. | | | (stat) - the stat itself, usually a floating point number
  407. '''
  408. stats_dict = dict()
  409. stats_file = self.get_stats_file(framework_test.name, test_type)
  410. with open(stats_file) as stats:
  411. # dstat doesn't output a completely compliant CSV file - we need to strip the header
  412. while stats.next() != "\n":
  413. pass
  414. stats_reader = csv.reader(stats)
  415. main_header = stats_reader.next()
  416. sub_header = stats_reader.next()
  417. time_row = sub_header.index("epoch")
  418. int_counter = 0
  419. for row in stats_reader:
  420. time = float(row[time_row])
  421. int_counter += 1
  422. if time < start_time:
  423. continue
  424. elif time > end_time:
  425. return stats_dict
  426. if int_counter % interval != 0:
  427. continue
  428. row_dict = dict()
  429. for nextheader in main_header:
  430. if nextheader != "":
  431. row_dict[nextheader] = dict()
  432. header = ""
  433. for item_num, column in enumerate(row):
  434. if len(main_header[item_num]) != 0:
  435. header = main_header[item_num]
  436. # all the stats are numbers, so we want to make sure that they stay that way in json
  437. row_dict[header][sub_header[item_num]] = float(column)
  438. stats_dict[time] = row_dict
  439. return stats_dict
  440. def __calculate_average_stats(self, raw_stats):
  441. '''
  442. We have a large amount of raw data for the statistics that may be useful
  443. for the stats nerds, but most people care about a couple of numbers. For
  444. now, we're only going to supply:
  445. * Average CPU
  446. * Average Memory
  447. * Total network use
  448. * Total disk use
  449. More may be added in the future. If they are, please update the above list.
  450. Note: raw_stats is directly from the __parse_stats method.
  451. Recall that this consists of a dictionary of timestamps, each of which
  452. contain a dictionary of stat categories which contain a dictionary of stats
  453. '''
  454. raw_stat_collection = dict()
  455. for time_dict in raw_stats.items()[1]:
  456. for main_header, sub_headers in time_dict.items():
  457. item_to_append = None
  458. if 'cpu' in main_header:
  459. # We want to take the idl stat and subtract it from 100
  460. # to get the time that the CPU is NOT idle.
  461. item_to_append = sub_headers['idl'] - 100.0
  462. elif main_header == 'memory usage':
  463. item_to_append = sub_headers['used']
  464. elif 'net' in main_header:
  465. # Network stats have two parts - recieve and send. We'll use a tuple of
  466. # style (recieve, send)
  467. item_to_append = (sub_headers['recv'], sub_headers['send'])
  468. elif 'dsk' or 'io' in main_header:
  469. # Similar for network, except our tuple looks like (read, write)
  470. item_to_append = (sub_headers['read'], sub_headers['writ'])
  471. if item_to_append is not None:
  472. if main_header not in raw_stat_collection:
  473. raw_stat_collection[main_header] = list()
  474. raw_stat_collection[main_header].append(item_to_append)
  475. # Simple function to determine human readable size
  476. # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
  477. def sizeof_fmt(num):
  478. # We'll assume that any number we get is convertable to a float, just in case
  479. num = float(num)
  480. for x in ['bytes', 'KB', 'MB', 'GB']:
  481. if 1024.0 > num > -1024.0:
  482. return "%3.1f%s" % (num, x)
  483. num /= 1024.0
  484. return "%3.1f%s" % (num, 'TB')
  485. # Now we have our raw stats in a readable format - we need to format it for display
  486. # We need a floating point sum, so the built in sum doesn't cut it
  487. display_stat_collection = dict()
  488. for header, values in raw_stat_collection.items():
  489. display_stat = None
  490. if 'cpu' in header:
  491. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  492. elif main_header == 'memory usage':
  493. display_stat = sizeof_fmt(math.fsum(values) / len(values))
  494. elif 'net' in main_header:
  495. receive, send = zip(*values) # unzip
  496. display_stat = {
  497. 'receive': sizeof_fmt(math.fsum(receive)),
  498. 'send': sizeof_fmt(math.fsum(send))
  499. }
  500. else: # if 'dsk' or 'io' in header:
  501. read, write = zip(*values) # unzip
  502. display_stat = {
  503. 'read': sizeof_fmt(math.fsum(read)),
  504. 'write': sizeof_fmt(math.fsum(write))
  505. }
  506. display_stat_collection[header] = display_stat
  507. return display_stat