1
0
mirror of https://github.com/deadc0de6/dotdrop.git synced 2026-02-04 19:44:45 +00:00
Files
dotdrop/tests-ng/tests_launcher.py
2023-09-07 20:49:12 +02:00

155 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2020, deadc0de6
tests launcher
"""
import os
import sys
import subprocess
import argparse
from concurrent import futures
from halo import Halo
LOG_FILE = '/tmp/dotdrop-tests_launcher.log'
GITHUB_ENV = 'GITHUB_WORKFLOW'
def is_cicd():
"""are we in a CICD env (github actions)"""
return GITHUB_ENV in os.environ
def run_test(logfd, path):
"""run test pointed by path"""
cur = os.path.dirname(sys.argv[0])
name = os.path.basename(path)
path = os.path.join(cur, name)
if logfd:
logfd.write(f'starting test \"{path}\"\n')
logfd.flush()
# pylint: disable=R1732
proc = subprocess.Popen(path, shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding='utf-8',
errors='ignore')
out, _ = proc.communicate()
ret = proc.returncode == 0
reason = 'returncode'
if 'Traceback' in out:
ret = False
reason = 'traceback'
if logfd:
logfd.write(f'done test \"{path}\": ok:{ret}\n')
logfd.flush()
return ret, reason, path, out
def get_tests():
"""get all tests available in current directory"""
tests = []
cur = os.path.dirname(sys.argv[0])
for (_, _, filenames) in os.walk(cur):
for path in filenames:
if not path.endswith('.sh'):
continue
tests.append(path)
break
tests.sort()
return tests
def run_tests(max_jobs=None, stop_on_first_err=True, spinner=True):
"""run the tests"""
print(f'max parallel jobs: {max_jobs}')
print(f'stop on first error: {stop_on_first_err}')
print(f'use spinner: {spinner}')
tests = get_tests()
logfd = sys.stdout
if not is_cicd():
# pylint: disable=R1732
logfd = open(LOG_FILE, 'w', encoding='utf-8')
if max_jobs:
logfd.write(f'run tests with {max_jobs} parallel job(s)\n')
logfd.write(f'running {len(tests)} test(s)\n')
logfd.flush()
print()
spinner = None
if not is_cicd() and spinner:
# no spinner on github actions
spinner = Halo(text='Testing', spinner='bouncingBall')
spinner.start()
with futures.ThreadPoolExecutor(max_workers=max_jobs) as ex:
wait_for = {}
for test in tests:
j = ex.submit(run_test, logfd, test)
wait_for[j] = test
logfd.flush()
for test in futures.as_completed(wait_for.keys()):
try:
ret, reason, name, log = test.result()
# pylint: disable=W0703
except Exception as exc:
if stop_on_first_err:
ex.shutdown(wait=False)
for job in wait_for:
job.cancel()
print()
print(f'test \"{wait_for[test]}\" failed: {exc}')
if stop_on_first_err:
logfd.close()
return False
if not ret:
if stop_on_first_err:
ex.shutdown(wait=False)
for job in wait_for:
job.cancel()
print()
if stop_on_first_err:
print(log)
print(f'test \"{name}\" failed: {reason}')
if stop_on_first_err:
logfd.close()
return False
else:
if not spinner:
print(f'OK - test \"{name}\" succeeded!')
sys.stdout.write('\n')
if spinner:
spinner.stop()
print()
logfd.write(f'done - ran {len(tests)} test(s)\n')
logfd.close()
return True
def main():
"""entry point"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--proc',
type=int)
parser.add_argument('-s', '--stoponerr',
action='store_true')
parser.add_argument('-n', '--nospinner',
action='store_true')
args = parser.parse_args()
return run_tests(max_jobs=args.proc,
stop_on_first_err=args.stoponerr,
spinner=not args.nospinner)
if __name__ == '__main__':
if not main():
sys.exit(1)
sys.exit(0)