123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- #!/usr/bin/env python3
- # #############################################################################
- # Copyright (c) 2018-present lzutao <taolzu(at)gmail.com>
- # All rights reserved.
- #
- # This source code is licensed under both the BSD-style license (found in the
- # LICENSE file in the root directory of this source tree) and the GPLv2 (found
- # in the COPYING file in the root directory of this source tree).
- # #############################################################################
- import os
- import subprocess
- import tempfile
- def valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench):
- VALGRIND_ARGS = [valgrind, '--leak-check=full', '--show-leak-kinds=all', '--error-exitcode=1']
- print('\n ---- valgrind tests : memory analyzer ----')
- subprocess.check_call([*VALGRIND_ARGS, datagen, '-g50M'], stdout=subprocess.DEVNULL)
- if subprocess.call([*VALGRIND_ARGS, zstd],
- stdout=subprocess.DEVNULL) == 0:
- raise subprocess.CalledProcessError('zstd without argument should have failed')
- with subprocess.Popen([datagen, '-g80'], stdout=subprocess.PIPE) as p1, \
- subprocess.Popen([*VALGRIND_ARGS, zstd, '-', '-c'],
- stdin=p1.stdout,
- stdout=subprocess.DEVNULL) as p2:
- p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
- p2.communicate()
- if p2.returncode != 0:
- raise subprocess.CalledProcessError()
- with subprocess.Popen([datagen, '-g16KB'], stdout=subprocess.PIPE) as p1, \
- subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'],
- stdin=p1.stdout,
- stdout=subprocess.DEVNULL) as p2:
- p1.stdout.close()
- p2.communicate()
- if p2.returncode != 0:
- raise subprocess.CalledProcessError()
- with tempfile.NamedTemporaryFile() as tmp_fd:
- with subprocess.Popen([datagen, '-g2930KB'], stdout=subprocess.PIPE) as p1, \
- subprocess.Popen([*VALGRIND_ARGS, zstd, '-5', '-vf', '-', '-o', tmp_fd.name],
- stdin=p1.stdout) as p2:
- p1.stdout.close()
- p2.communicate()
- if p2.returncode != 0:
- raise subprocess.CalledProcessError()
- subprocess.check_call([*VALGRIND_ARGS, zstd, '-vdf', tmp_fd.name, '-c'],
- stdout=subprocess.DEVNULL)
- with subprocess.Popen([datagen, '-g64MB'], stdout=subprocess.PIPE) as p1, \
- subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'],
- stdin=p1.stdout,
- stdout=subprocess.DEVNULL) as p2:
- p1.stdout.close()
- p2.communicate()
- if p2.returncode != 0:
- raise subprocess.CalledProcessError()
- subprocess.check_call([*VALGRIND_ARGS, fuzzer, '-T1mn', '-t1'])
- subprocess.check_call([*VALGRIND_ARGS, fullbench, '-i1'])
- def main():
- import argparse
- parser = argparse.ArgumentParser(description='Valgrind tests : memory analyzer')
- parser.add_argument('valgrind', help='valgrind path')
- parser.add_argument('zstd', help='zstd path')
- parser.add_argument('datagen', help='datagen path')
- parser.add_argument('fuzzer', help='fuzzer path')
- parser.add_argument('fullbench', help='fullbench path')
- args = parser.parse_args()
- valgrind = args.valgrind
- zstd = args.zstd
- datagen = args.datagen
- fuzzer = args.fuzzer
- fullbench = args.fullbench
- valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench)
- if __name__ == '__main__':
- main()
|