#!/usr/bin/env python3 """Run a test repeatedly with mallocfail. Usage: run_mallocfail [--ctest=] [...] This runs the programs with mallocfail until there are no more additional stack hashes for mallocfail to try out. Passing "--ctest" additionally runs all the tests with a cost lower than the given flag value. Cost is measured in seconds of runtime. You need to build mallocfail (https://github.com/ralight/mallocfail) and install it to /usr/local/lib/mallocfail. Change _MALLOCFAIL_SO below if you want it elsewhere. """ import glob import multiprocessing import os import shutil import subprocess import sys import tempfile import time from typing import Dict from typing import List from typing import NoReturn from typing import Optional from typing import Tuple _PRIMER = "./unit_util_test" _MALLOCFAIL_SO = "/usr/local/lib/mallocfail.so" _HASHES = "mallocfail_hashes" _HASHES_PREV = "mallocfail_hashes.prev" _TIMEOUT = 3.0 _ENV = { "LD_PRELOAD": _MALLOCFAIL_SO, "UBSAN_OPTIONS": "color=always,print_stacktrace=1,exitcode=11", } def run_mallocfail(tmpdir: str, timeout: float, exe: str, iteration: int) -> bool: """Run a program with mallocfail.""" print(f"\x1b[1;33mmallocfail '{exe}' run #{iteration}\x1b[0m") hashes = os.path.join(tmpdir, _HASHES) hashes_prev = os.path.join(tmpdir, _HASHES_PREV) if os.path.exists(hashes): shutil.copy(hashes, hashes_prev) try: proc = subprocess.run([exe], timeout=timeout, env=_ENV, cwd=tmpdir) except subprocess.TimeoutExpired: print(f"\x1b[1;34mProgram {exe} timed out\x1b[0m") return True finally: assert os.path.exists(hashes) if os.path.exists(hashes_prev): with open(hashes_prev, "r") as prev: with open(hashes, "r") as cur: if prev.read() == cur.read(): # Done: no new stack hashes. return False if proc.returncode in (0, 1): # Process exited cleanly (success or failure). pass elif proc.returncode == -6: # Assertion failed. pass elif proc.returncode == -14: print(f"\x1b[0;34mProgram '{exe}' timed out\x1b[0m") else: print( f"\x1b[1;34mProgram '{exe}' failed to handle OOM situation cleanly\x1b[0m" ) raise Exception("Aborting test") return True def ctest_costs() -> Dict[str, float]: with open("Testing/Temporary/CTestCostData.txt", "r") as fh: costs = {} for line in fh.readlines(): if line.startswith("---"): break prog, _, cost = line.rstrip().split(" ") costs[prog] = float(cost) return costs def find_prog(name: str) -> Tuple[Optional[str], ...]: def attempt(path: str) -> Optional[str]: if os.path.exists(path): return path return None return (attempt(f"./unit_{name}_test"), attempt(f"./auto_{name}_test")) def parse_flags(args: List[str]) -> Tuple[Dict[str, str], List[str]]: flags: Dict[str, str] = {} exes: List[str] = [] for arg in args: if arg.startswith("--"): flag, value = arg.split("=", 1) flags[flag] = value else: exes.append(arg) return flags, exes def loop_mallocfail(tmpdir: str, timeout: float, exe: str) -> None: i = 1 while run_mallocfail(tmpdir, timeout, exe, i): i += 1 def isolated_mallocfail(timeout: int, exe: str) -> None: with tempfile.TemporaryDirectory(prefix="mallocfail") as tmpdir: print(f"\x1b[1;33mRunning for {exe} in isolated path {tmpdir}\x1b[0m") shutil.copy(exe, os.path.join(tmpdir, exe)) shutil.copy(_HASHES, os.path.join(tmpdir, _HASHES)) loop_mallocfail(tmpdir, timeout, exe) def main(args: List[str]) -> None: """Run a program repeatedly under mallocfail.""" if len(args) == 1: print(f"Usage: {args[0]} ") sys.exit(1) flags, exes = parse_flags(args[1:]) timeout = _TIMEOUT if "--ctest" in flags: costs = ctest_costs() max_cost = float(flags["--ctest"]) timeout = max(max_cost + 1, timeout) exes.extend(prog for test in costs.keys() for prog in find_prog(test) if costs[test] <= max_cost and prog) if "--jobs" in flags: jobs = int(flags["--jobs"]) else: jobs = 1 # Start by running util_test, which allocates no memory of its own, just # to prime the mallocfail hashes so it doesn't make global initialisers # such as llvm_gcov_init fail. if os.path.exists(_PRIMER): print(f"\x1b[1;33mPriming hashes with unit_util_test\x1b[0m") loop_mallocfail(".", timeout, _PRIMER) print(f"\x1b[1;33m--------------------------------\x1b[0m") print(f"\x1b[1;33mStarting mallocfail for {len(exes)} programs:\x1b[0m") print(f"\x1b[1;33m{exes}\x1b[0m") print(f"\x1b[1;33m--------------------------------\x1b[0m") time.sleep(1) with multiprocessing.Pool(jobs) as p: done = tuple( p.starmap(isolated_mallocfail, ((timeout, exe) for exe in exes))) print(f"\x1b[1;32mCompleted {len(done)} programs\x1b[0m") if __name__ == "__main__": main(sys.argv)