diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py index 5e8dba5dbde71a..2eb517047806ba 100644 --- a/Lib/test/libregrtest/__init__.py +++ b/Lib/test/libregrtest/__init__.py @@ -1,2 +1,6 @@ -from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES -from test.libregrtest.main import main +from test.support import TestStats +from .cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES +from .result import FilterTuple, State, TestResult +from .runtests import TestsTuple, FilterDict, RunTests +from .results import TestsList, Results +from .main import main diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py index d1a590d8c1a5b3..bd836a6afd3431 100644 --- a/Lib/test/libregrtest/cmdline.py +++ b/Lib/test/libregrtest/cmdline.py @@ -2,6 +2,7 @@ import os import shlex import sys + from test.support import os_helper @@ -154,7 +155,7 @@ def __init__(self, **kwargs) -> None: self.fromfile = None self.fail_env_changed = False self.use_resources = None - self.trace = False + self.coverage = False self.coverdir = 'coverage' self.runleaks = False self.huntrleaks = False @@ -170,7 +171,8 @@ def __init__(self, **kwargs) -> None: self.ignore_tests = None self.pgo = False self.pgo_extended = False - + self.threshold = None + self.wait = False super().__init__(**kwargs) @@ -205,7 +207,7 @@ def _create_parser(): group.add_argument('--wait', action='store_true', help='wait for user input, e.g., allow a debugger ' 'to be attached') - group.add_argument('--worker-args', metavar='ARGS') + group.add_argument('--worker-json', metavar='ARGS') group.add_argument('-S', '--start', metavar='START', help='the name of the test at which to start.' + more_details) @@ -283,7 +285,6 @@ def _create_parser(): dest='use_mp', type=int, help='run PROCESSES processes at once') group.add_argument('-T', '--coverage', action='store_true', - dest='trace', help='turn on code coverage tracing using the trace ' 'module') group.add_argument('-D', '--coverdir', metavar='DIR', @@ -378,11 +379,11 @@ def _parse_args(args, **kwargs): if ns.single and ns.fromfile: parser.error("-s and -f don't go together!") - if ns.use_mp is not None and ns.trace: + if ns.use_mp is not None and ns.coverage: parser.error("-T and -j don't go together!") if ns.python is not None: if ns.use_mp is None: - parser.error("-p requires -j!") + parser.error("--python option requires the -j option!") # The "executable" may be two or more parts, e.g. "node python.js" ns.python = shlex.split(ns.python) if ns.failfast and not (ns.verbose or ns.verbose3): @@ -401,10 +402,6 @@ def _parse_args(args, **kwargs): if ns.timeout is not None: if ns.timeout <= 0: ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) if ns.use: for a in ns.use: for r in a: @@ -448,4 +445,13 @@ def _parse_args(args, **kwargs): # --forever implies --failfast ns.failfast = True + if ns.huntrleaks: + warmup, repetitions, _ = ns.huntrleaks + if warmup < 1 or repetitions < 1: + msg = ("Invalid values for the --huntrleaks/-R parameters. The " + "number of warmups and repetitions must be at least 1 " + "each (1:1).") + print(msg, file=sys.stderr, flush=True) + sys.exit(2) + return ns diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py new file mode 100644 index 00000000000000..ac5e3dc8cac5fa --- /dev/null +++ b/Lib/test/libregrtest/findtests.py @@ -0,0 +1,92 @@ +import os.path +import sys +import unittest +from test import support + +from .result import FilterTuple +from .utils import abs_module_name, count, printlist + + +# If these test directories are encountered recurse into them and treat each +# test_ .py or dir as a separate test module. This can increase parallelism. +# Beware this can't generally be done for any directory with sub-tests as the +# __init__.py may do things which alter what tests are to be run. + +SPLITTESTDIRS = frozenset({ + "test_asyncio", + "test_concurrent_futures", + "test_multiprocessing_fork", + "test_multiprocessing_forkserver", + "test_multiprocessing_spawn", +}) + + +def findtestdir(path=None): + return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir + + +def findtests(*, testdir=None, exclude=(), + split_test_dirs=SPLITTESTDIRS, base_mod=""): + """Return a list of all applicable test modules.""" + testdir = findtestdir(testdir) + tests = [] + for name in os.listdir(testdir): + mod, ext = os.path.splitext(name) + if (not mod.startswith("test_")) or (mod in exclude): + continue + if mod in split_test_dirs: + subdir = os.path.join(testdir, mod) + mod = f"{base_mod or 'test'}.{mod}" + tests.extend(findtests(testdir=subdir, exclude=exclude, + split_test_dirs=split_test_dirs, base_mod=mod)) + elif ext in (".py", ""): + tests.append(f"{base_mod}.{mod}" if base_mod else mod) + return sorted(tests) + + +def split_test_packages(tests, *, testdir=None, exclude=(), + split_test_dirs=SPLITTESTDIRS): + testdir = findtestdir(testdir) + splitted = [] + for name in tests: + if name in split_test_dirs: + subdir = os.path.join(testdir, name) + splitted.extend(findtests(testdir=subdir, exclude=exclude, + split_test_dirs=split_test_dirs, + base_mod=name)) + else: + splitted.append(name) + return splitted + + +def _list_cases(suite): + for test in suite: + if isinstance(test, unittest.loader._FailedTest): + continue + if isinstance(test, unittest.TestSuite): + _list_cases(test) + elif isinstance(test, unittest.TestCase): + if support.match_test(test): + print(test.id()) + +def list_cases(tests, *, test_dir: str, + match_tests: FilterTuple | None = None, + ignore_tests: FilterTuple | None = None) -> None: + support.verbose = False + support.set_match_tests(match_tests, ignore_tests) + + skipped = [] + for test_name in tests: + module_name = abs_module_name(test_name, test_dir) + try: + suite = unittest.defaultTestLoader.loadTestsFromName(module_name) + _list_cases(suite) + except unittest.SkipTest: + skipped.append(test_name) + + if skipped: + sys.stdout.flush() + stderr = sys.stderr + print(file=stderr) + print(count(len(skipped), "test"), "skipped:", file=stderr) + printlist(skipped, file=stderr) diff --git a/Lib/test/libregrtest/logger.py b/Lib/test/libregrtest/logger.py new file mode 100644 index 00000000000000..ed434465a2754c --- /dev/null +++ b/Lib/test/libregrtest/logger.py @@ -0,0 +1,89 @@ +import os +import sys +import time + +from . import RunTests + + +class Logger: + def __init__(self, pgo: bool): + self.start_time = time.perf_counter() + self.win_load_tracker = None + self.pgo = pgo + + # used to display the progress bar "[ 3/100]" + self.test_count_text = '' + self.test_count_width = 1 + + def set_tests(self, runtests: RunTests): + if runtests.forever: + self.test_count_text = '' + self.test_count_width = 3 + else: + self.test_count_text = '/{}'.format(len(runtests.tests)) + self.test_count_width = len(self.test_count_text) - 1 + + def start_load_tracker(self): + if sys.platform != 'win32': + return + + # If we're on windows and this is the parent runner (not a worker), + # track the load average. + from .win_utils import WindowsLoadTracker + + try: + self.win_load_tracker = WindowsLoadTracker() + except PermissionError as error: + # Standard accounts may not have access to the performance + # counters. + print(f'Failed to create WindowsLoadTracker: {error}') + + def stop_load_tracker(self): + if self.win_load_tracker is not None: + self.win_load_tracker.close() + self.win_load_tracker = None + + def get_time(self): + return time.perf_counter() - self.start_time + + def getloadavg(self): + if self.win_load_tracker is not None: + return self.win_load_tracker.getloadavg() + + if hasattr(os, 'getloadavg'): + return os.getloadavg()[0] + + return None + + def log(self, line=''): + empty = not line + + # add the system load prefix: "load avg: 1.80 " + load_avg = self.getloadavg() + if load_avg is not None: + line = f"load avg: {load_avg:.2f} {line}" + + # add the timestamp prefix: "0:01:05 " + test_time = self.get_time() + + mins, secs = divmod(int(test_time), 60) + hours, mins = divmod(mins, 60) + test_time = "%d:%02d:%02d" % (hours, mins, secs) + + line = f"{test_time} {line}" + if empty: + line = line[:-1] + + print(line, flush=True) + + def display_progress(self, test_index, text, results, runtests): + quiet = runtests.quiet + if quiet: + return + + # "[ 51/405/1] test_tcl passed" + line = f"{test_index:{self.test_count_width}}{self.test_count_text}" + fails = len(results.bad) + len(results.environment_changed) + if fails and not self.pgo: + line = f"{line}/{fails}" + self.log(f"[{line}] {text}") diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index ab03647ca5802f..30041cc6a59190 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -1,39 +1,34 @@ import faulthandler -import locale import os -import platform import random import re import sys -import sysconfig -import tempfile import time -import unittest -from test.libregrtest.cmdline import _parse_args -from test.libregrtest.runtest import ( - findtests, split_test_packages, runtest, abs_module_name, - PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests) -from test.libregrtest.setup import setup_tests -from test.libregrtest.pgo import setup_pgo_tests -from test.libregrtest.utils import (strip_py_suffix, count, format_duration, - printlist, get_build_info) + from test import support -from test.support import TestStats from test.support import os_helper from test.support import threading_helper - -# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). -# Used to protect against threading._shutdown() hang. -# Must be smaller than buildbot "1200 seconds without output" limit. +from . import ( + TestsTuple, FilterTuple, State, RunTests, TestsList, Results) +from .cmdline import _parse_args +from .findtests import ( + findtests, split_test_packages, list_cases) +from .logger import Logger +from .single import run_single_test, PROGRESS_MIN_TIME +from .setup import setup_tests, setup_test_dir +from .pgo import PGO_TESTS +from .utils import ( + strip_py_suffix, format_duration, count, printlist, + fix_umask, display_header, + select_temp_dir, make_temp_dir, cleanup_directory) + + +# bpo-38203: maximum delay in seconds to exit python (call py_finalize()). +# used to protect against threading._shutdown() hang. +# must be smaller than buildbot "1200 seconds without output" limit. EXIT_TIMEOUT = 120.0 -EXITCODE_BAD_TEST = 2 -EXITCODE_ENV_CHANGED = 3 -EXITCODE_NO_TESTS_RAN = 4 -EXITCODE_RERUN_FAIL = 5 -EXITCODE_INTERRUPTED = 130 - class Regrtest: """Execute a test suite. @@ -58,306 +53,176 @@ class Regrtest: directly to set the values that would normally be set by flags on the command line. """ - def __init__(self): - # Namespace of command line options - self.ns = None - - # tests - self.tests = [] - self.selected = [] - self.all_runtests: list[RunTests] = [] - - # test results - self.good: list[str] = [] - self.bad: list[str] = [] - self.rerun_bad: list[str] = [] - self.skipped: list[str] = [] - self.resource_denied: list[str] = [] - self.environment_changed: list[str] = [] - self.run_no_tests: list[str] = [] - self.rerun: list[str] = [] - - self.need_rerun: list[TestResult] = [] - self.first_state: str | None = None - self.interrupted = False - self.total_stats = TestStats() - - # used by --slow - self.test_times = [] - - # used by --coverage, trace.Trace instance - self.tracer = None - - # used to display the progress bar "[ 3/100]" - self.start_time = time.perf_counter() - self.test_count_text = '' - self.test_count_width = 1 - - # used by --single + def __init__(self, ns): + # Actions + self.want_header = ns.header + self.want_list_tests = ns.list_tests + self.want_list_cases = ns.list_cases + self.want_wait = ns.wait + self.want_cleanup = ns.cleanup + self.want_rerun = ns.rerun + + # Select tests + if ns.match_tests: + self.match_tests: FilterTuple = tuple(ns.match_tests) + else: + self.match_tests = None + if ns.ignore_tests: + self.ignore_tests: FilterTuple = tuple(ns.ignore_tests) + else: + self.ignore_tests = None + self.cmdline_args = ns.args + self.exclude = ns.exclude + self.fromfile = ns.fromfile + self.use_resources = ns.use_resources + self.starting_test = ns.start + + # Run tests + self.num_processes = ns.use_mp + self.worker_json = ns.worker_json + self.coverage = ns.coverage + self.coverage_dir = ns.coverdir + + # Options to run tests + self.fail_fast = ns.failfast + self.forever = ns.forever + self.quiet = ns.quiet + self.runleaks = ns.runleaks + self.print_slowest = ns.print_slow + self.pgo = ns.pgo + self.pgo_extended = ns.pgo_extended + self.randomize = ns.randomize + self.random_seed = ns.random_seed + self.gc_threshold = ns.threshold + self.output_on_failure = ns.verbose3 + self.verbose = ns.verbose + self.python_executable = ns.python + self.test_dir = ns.testdir + self.timeout = ns.timeout + self.orig_tempdir = ns.tempdir + self.huntrleaks = ns.huntrleaks + self.memlimit = ns.memlimit + + # Configure the exit code + self.fail_env_changed = ns.fail_env_changed + self.fail_rerun = ns.fail_rerun + + # Used by --single + self.single_test_run = ns.single self.next_single_test = None self.next_single_filename = None - # used by --junit-xml - self.testsuite_xml = None + # Tests + self.results = Results(xmlpath=ns.xmlpath) + self.first_runtests: RunTests | None = None + self.first_state: str | None = None - # misc - self.win_load_tracker = None + # Misc self.tmp_dir = None + self.logger = Logger(self.pgo) + self.log = self.logger.log - def get_executed(self): - return (set(self.good) | set(self.bad) | set(self.skipped) - | set(self.resource_denied) | set(self.environment_changed) - | set(self.run_no_tests)) - - def accumulate_result(self, result, rerun=False): - fail_env_changed = self.ns.fail_env_changed - test_name = result.test_name - - match result.state: - case State.PASSED: - self.good.append(test_name) - case State.ENV_CHANGED: - self.environment_changed.append(test_name) - case State.SKIPPED: - self.skipped.append(test_name) - case State.RESOURCE_DENIED: - self.resource_denied.append(test_name) - case State.INTERRUPTED: - self.interrupted = True - case State.DID_NOT_RUN: - self.run_no_tests.append(test_name) - case _: - if result.is_failed(fail_env_changed): - self.bad.append(test_name) - self.need_rerun.append(result) - else: - raise ValueError(f"invalid test state: {result.state!r}") - - if result.has_meaningful_duration() and not rerun: - self.test_times.append((result.duration, test_name)) - if result.stats is not None: - self.total_stats.accumulate(result.stats) - if rerun: - self.rerun.append(test_name) - - xml_data = result.xml_data - if xml_data: - import xml.etree.ElementTree as ET - for e in xml_data: - try: - self.testsuite_xml.append(ET.fromstring(e)) - except ET.ParseError: - print(xml_data, file=sys.__stderr__) - raise - - def log(self, line=''): - empty = not line - - # add the system load prefix: "load avg: 1.80 " - load_avg = self.getloadavg() - if load_avg is not None: - line = f"load avg: {load_avg:.2f} {line}" - - # add the timestamp prefix: "0:01:05 " - test_time = time.perf_counter() - self.start_time - - mins, secs = divmod(int(test_time), 60) - hours, mins = divmod(mins, 60) - test_time = "%d:%02d:%02d" % (hours, mins, secs) - - line = f"{test_time} {line}" - if empty: - line = line[:-1] - - print(line, flush=True) - - def display_progress(self, test_index, text): - quiet = self.ns.quiet - pgo = self.ns.pgo - if quiet: - return - - # "[ 51/405/1] test_tcl passed" - line = f"{test_index:{self.test_count_width}}{self.test_count_text}" - fails = len(self.bad) + len(self.environment_changed) - if fails and not pgo: - line = f"{line}/{fails}" - self.log(f"[{line}] {text}") - - def parse_args(self, kwargs): - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.xmlpath: - support.junit_xml_list = self.testsuite_xml = [] - - strip_py_suffix(ns.args) - - if ns.huntrleaks: - warmup, repetitions, _ = ns.huntrleaks - if warmup < 1 or repetitions < 1: - msg = ("Invalid values for the --huntrleaks/-R parameters. The " - "number of warmups and repetitions must be at least 1 " - "each (1:1).") - print(msg, file=sys.stderr, flush=True) - sys.exit(2) + def find_tests(self, tests: TestsList | None, + test_dir: str) -> tuple[TestsTuple, TestsTuple | None]: + if tests is not None: + tests = list(tests) + selected = [] - if ns.tempdir: - ns.tempdir = os.path.expanduser(ns.tempdir) - - self.ns = ns - - def find_tests(self, tests): - ns = self.ns - single = ns.single - fromfile = ns.fromfile - pgo = ns.pgo - exclude = ns.exclude - test_dir = ns.testdir - starting_test = ns.start - randomize = ns.randomize - - self.tests = tests - - if single: + if self.single_test_run: self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') try: with open(self.next_single_filename, 'r') as fp: next_test = fp.read().strip() - self.tests = [next_test] + tests = [next_test] except OSError: pass - if fromfile: - self.tests = [] + if self.fromfile: + tests = [] # regex to match 'test_builtin' in line: # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') - with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp: + with open(os.path.join(os_helper.SAVEDCWD, self.fromfile)) as fp: for line in fp: line = line.split('#', 1)[0] line = line.strip() match = regex.search(line) if match is not None: - self.tests.append(match.group()) + tests.append(match.group()) - strip_py_suffix(self.tests) + strip_py_suffix(tests) - if pgo: + if self.pgo: # add default PGO tests if no tests are specified - setup_pgo_tests(ns) + if not self.cmdline_args and not self.pgo_extended: + # run default set of tests for PGO training + self.cmdline_args[:] = PGO_TESTS exclude_tests = set() - if exclude: - for arg in ns.args: + if self.exclude: + for arg in self.cmdline_args: exclude_tests.add(arg) - ns.args = [] + self.cmdline_args = [] - alltests = findtests(testdir=test_dir, exclude=exclude_tests) + alltests = findtests(testdir=self.test_dir, exclude=exclude_tests) - if not fromfile: - self.selected = self.tests or ns.args - if self.selected: - self.selected = split_test_packages(self.selected) + if not self.fromfile: + selected = tests or self.cmdline_args + if selected: + selected = split_test_packages(selected) else: - self.selected = alltests + selected = alltests else: - self.selected = self.tests + selected = tests - if single: - self.selected = self.selected[:1] + if self.single_test_run: + selected = selected[:1] try: - pos = alltests.index(self.selected[0]) + pos = alltests.index(selected[0]) self.next_single_test = alltests[pos + 1] except IndexError: pass # Remove all the selected tests that precede start if it's set. - if starting_test: + if self.starting_test: try: - del self.selected[:self.selected.index(starting_test)] + del selected[:selected.index(self.starting_test)] except ValueError: - print(f"Cannot find starting test: {starting_test}") + print(f"Cannot find starting test: {self.starting_test}") sys.exit(1) - if randomize: - if ns.random_seed is None: - ns.random_seed = random.randrange(10000000) - random.seed(ns.random_seed) - random.shuffle(self.selected) - - def list_tests(self): - for name in self.selected: - print(name) - - def _list_cases(self, suite): - for test in suite: - if isinstance(test, unittest.loader._FailedTest): - continue - if isinstance(test, unittest.TestSuite): - self._list_cases(test) - elif isinstance(test, unittest.TestCase): - if support.match_test(test): - print(test.id()) - - def list_cases(self): - ns = self.ns - test_dir = ns.testdir - support.verbose = False - support.set_match_tests(ns.match_tests, ns.ignore_tests) - - skipped = [] - for test_name in self.selected: - module_name = abs_module_name(test_name, test_dir) - try: - suite = unittest.defaultTestLoader.loadTestsFromName(module_name) - self._list_cases(suite) - except unittest.SkipTest: - skipped.append(test_name) - - if skipped: - sys.stdout.flush() - stderr = sys.stderr - print(file=stderr) - print(count(len(skipped), "test"), "skipped:", file=stderr) - printlist(skipped, file=stderr) - - def get_rerun_match(self, rerun_list) -> MatchTestsDict: - rerun_match_tests = {} - for result in rerun_list: - match_tests = result.get_rerun_match_tests() - # ignore empty match list - if match_tests: - rerun_match_tests[result.test_name] = match_tests - return rerun_match_tests - - def _rerun_failed_tests(self, need_rerun): - # Configure the runner to re-run tests - ns = self.ns - ns.verbose = True - ns.failfast = False - ns.verbose3 = False - ns.forever = False - if ns.use_mp is None: - ns.use_mp = 1 + selected = tuple(selected) + if tests is not None: + tests = tuple(tests) + return (selected, tests) + def _rerun_failed_tests(self, runtests: RunTests) -> None: # Get tests to re-run - tests = [result.test_name for result in need_rerun] - match_tests = self.get_rerun_match(need_rerun) - self.set_tests(tests) - - # Clear previously failed tests - self.rerun_bad.extend(self.bad) - self.bad.clear() - self.need_rerun.clear() + tests, match_tests_dict = self.results.prepare_rerun() # Re-run failed tests - self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses") - runtests = RunTests(tests, match_tests=match_tests, rerun=True) - self.all_runtests.append(runtests) + runtests = runtests.copy( + tests=tests, + rerun=True, + verbose=True, + match_tests_dict=match_tests_dict, + output_on_failure=False, + forever=False, + fail_fast=False) + self.logger.set_tests(runtests) + + if not self.num_processes: + # Always run tests in subprocesses, at least with 1 worker + # process + self.num_processes = 1 + + self.log(f"Re-running {len(runtests.tests)} failed tests " + f"in verbose mode with {self.num_processes} " + f"worker processes") self._run_tests_mp(runtests) - def rerun_failed_tests(self, need_rerun): - if self.ns.python: + def rerun_failed_tests(self, runtests: RunTests): + if self.python_executable: # Temp patch for https://github.com/python/cpython/issues/94052 self.log( "Re-running failed tests is not supported with --python " @@ -366,125 +231,58 @@ def rerun_failed_tests(self, need_rerun): return self.first_state = self.get_tests_state() - print() - self._rerun_failed_tests(need_rerun) - - if self.bad: - print(count(len(self.bad), 'test'), "failed again:") - printlist(self.bad) + self._rerun_failed_tests(runtests) - self.display_result() + bad = self.results.bad + if bad: + print(count(len(bad), 'test'), "failed again:") + printlist(bad) - def display_result(self): - pgo = self.ns.pgo - quiet = self.ns.quiet - print_slow = self.ns.print_slow + self.display_result(runtests.tests) + def display_result(self, tests): # If running the test suite for PGO then no one cares about results. - if pgo: + if self.pgo: return - print() - print("== Tests result: %s ==" % self.get_tests_state()) - - if self.interrupted: - print("Test suite interrupted by signal SIGINT.") - - omitted = set(self.selected) - self.get_executed() - if omitted: - print() - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - - if self.good and not quiet: - print() - if (not self.bad - and not self.skipped - and not self.interrupted - and len(self.good) > 1): - print("All", end=' ') - print(count(len(self.good), "test"), "OK.") - - if print_slow: - self.test_times.sort(reverse=True) - print() - print("10 slowest tests:") - for test_time, test in self.test_times[:10]: - print("- %s: %s" % (test, format_duration(test_time))) - - if self.bad: - print() - print(count(len(self.bad), "test"), "failed:") - printlist(self.bad) - - if self.environment_changed: - print() - print("{} altered the execution environment:".format( - count(len(self.environment_changed), "test"))) - printlist(self.environment_changed) - - if self.skipped and not quiet: - print() - print(count(len(self.skipped), "test"), "skipped:") - printlist(self.skipped) - - if self.resource_denied and not quiet: - print() - print(count(len(self.resource_denied), "test"), "skipped (resource denied):") - printlist(self.resource_denied) - - if self.rerun: - print() - print("%s:" % count(len(self.rerun), "re-run test")) - printlist(self.rerun) - - if self.run_no_tests: - print() - print(count(len(self.run_no_tests), "test"), "run no tests:") - printlist(self.run_no_tests) - - def run_test(self, test_index, test_name, previous_test, save_modules): - text = test_name - if previous_test: - text = '%s -- %s' % (text, previous_test) - self.display_progress(test_index, text) + self.results.display_result( + tests, + fail_env_changed=self.fail_env_changed, + first_state=self.first_state, + quiet=self.quiet, + print_slowest=self.print_slowest) - if self.tracer: + @staticmethod + def run_test(test_name, results, runtests, tracer): + if tracer is not None: # If we're tracing code coverage, then we don't exit with status # if on a false return value from main. - cmd = ('result = runtest(self.ns, test_name); ' - 'self.accumulate_result(result)') - ns = dict(locals()) - self.tracer.runctx(cmd, globals=globals(), locals=ns) - result = ns['result'] + cmd = ('result = run_single_test(test_name, runtests); ' + 'results.accumulate_result(result, runtests)') + local_namespace = dict(locals()) + tracer.runctx(cmd, globals=globals(), locals=local_namespace) + result = local_namespace['result'] else: - result = runtest(self.ns, test_name) - self.accumulate_result(result) - - # Unload the newly imported modules (best effort finalization) - for module in sys.modules.keys(): - if module not in save_modules and module.startswith("test."): - support.unload(module) - + result = run_single_test(test_name, runtests) + results.accumulate_result(result, runtests) return result - def run_tests_sequentially(self, runtests): - ns = self.ns - coverage = ns.trace - fail_fast = ns.failfast - fail_env_changed = ns.fail_env_changed - timeout = ns.timeout + def run_tests_sequentially(self, runtests: RunTests): + fail_fast = runtests.fail_fast + fail_env_changed = runtests.fail_env_changed - if coverage: + if self.coverage: import trace - self.tracer = trace.Trace(trace=False, count=True) + tracer = trace.Trace(trace=False, count=True) + else: + tracer = None save_modules = sys.modules.keys() msg = "Run tests sequentially" - if timeout: - msg += " (timeout: %s)" % format_duration(timeout) + if runtests.timeout: + msg += " (timeout: %s)" % format_duration(runtests.timeout) self.log(msg) previous_test = None @@ -492,8 +290,17 @@ def run_tests_sequentially(self, runtests): for test_index, test_name in enumerate(tests_iter, 1): start_time = time.perf_counter() - result = self.run_test(test_index, test_name, - previous_test, save_modules) + text = test_name + if previous_test: + text = '%s -- %s' % (text, previous_test) + self.logger.display_progress(test_index, text, self.results, runtests) + + result = self.run_test(test_name, self.results, runtests, tracer) + + # Unload the newly imported modules (best effort finalization) + for module in sys.modules.keys(): + if module not in save_modules and module.startswith("test."): + support.unload(module) if result.must_stop(fail_fast, fail_env_changed): break @@ -509,129 +316,35 @@ def run_tests_sequentially(self, runtests): if previous_test: print(previous_test) - def display_header(self): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("==", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== Python build:", ' '.join(get_build_info())) - print("== cwd:", os.getcwd()) - cpu_count = os.cpu_count() - if cpu_count: - print("== CPU count:", cpu_count) - print("== encodings: locale=%s, FS=%s" - % (locale.getencoding(), sys.getfilesystemencoding())) - self.display_sanitizers() - - def display_sanitizers(self): - # This makes it easier to remember what to set in your local - # environment when trying to reproduce a sanitizer failure. - asan = support.check_sanitizer(address=True) - msan = support.check_sanitizer(memory=True) - ubsan = support.check_sanitizer(ub=True) - sanitizers = [] - if asan: - sanitizers.append("address") - if msan: - sanitizers.append("memory") - if ubsan: - sanitizers.append("undefined behavior") - if not sanitizers: - return - - print(f"== sanitizers: {', '.join(sanitizers)}") - for sanitizer, env_var in ( - (asan, "ASAN_OPTIONS"), - (msan, "MSAN_OPTIONS"), - (ubsan, "UBSAN_OPTIONS"), - ): - options= os.environ.get(env_var) - if sanitizer and options is not None: - print(f"== {env_var}={options!r}") - - def no_tests_run(self): - return not any((self.good, self.bad, self.skipped, self.interrupted, - self.environment_changed)) + return tracer def get_tests_state(self): - fail_env_changed = self.ns.fail_env_changed - - result = [] - if self.bad: - result.append("FAILURE") - elif fail_env_changed and self.environment_changed: - result.append("ENV CHANGED") - elif self.no_tests_run(): - result.append("NO TESTS RAN") - - if self.interrupted: - result.append("INTERRUPTED") - - if not result: - result.append("SUCCESS") - - result = ', '.join(result) - if self.first_state: - result = '%s then %s' % (self.first_state, result) - return result + return self.results.get_state( + fail_env_changed=self.fail_env_changed, + first_state=self.first_state) def _run_tests_mp(self, runtests: RunTests) -> None: - from test.libregrtest.runtest_mp import run_tests_multiprocess - # If we're on windows and this is the parent runner (not a worker), - # track the load average. - if sys.platform == 'win32': - from test.libregrtest.win_utils import WindowsLoadTracker + from .mp_runner import MultiprocessTestRunner - try: - self.win_load_tracker = WindowsLoadTracker() - except PermissionError as error: - # Standard accounts may not have access to the performance - # counters. - print(f'Failed to create WindowsLoadTracker: {error}') + runner = MultiprocessTestRunner(self.results, runtests, + self.logger, self.num_processes) + runner.run_tests() - try: - run_tests_multiprocess(self, runtests) - finally: - if self.win_load_tracker is not None: - self.win_load_tracker.close() - self.win_load_tracker = None - - def set_tests(self, tests): - self.tests = tests - if self.ns.forever: - self.test_count_text = '' - self.test_count_width = 3 - else: - self.test_count_text = '/{}'.format(len(self.tests)) - self.test_count_width = len(self.test_count_text) - 1 + def run_tests(self, runtests: RunTests): + self.logger.set_tests(runtests) - def run_tests(self): - # For a partial run, we do not need to clutter the output. - if (self.ns.header - or not(self.ns.pgo or self.ns.quiet or self.ns.single - or self.tests or self.ns.args)): - self.display_header() + setup_tests(runtests) - if self.ns.huntrleaks: - warmup, repetitions, _ = self.ns.huntrleaks - if warmup < 3: - msg = ("WARNING: Running tests with --huntrleaks/-R and less than " - "3 warmup repetitions can give false positives!") - print(msg, file=sys.stdout, flush=True) - - if self.ns.randomize: - print("Using random seed", self.ns.random_seed) - - tests = self.selected - self.set_tests(tests) - runtests = RunTests(tests, forever=self.ns.forever) - self.all_runtests.append(runtests) - if self.ns.use_mp: + if self.num_processes: self._run_tests_mp(runtests) + tracer = None else: - self.run_tests_sequentially(runtests) + tracer = self.run_tests_sequentially(runtests) + return tracer + + def finalize_tests(self, tracer) -> None: + self.display_summary() - def finalize(self): if self.next_single_filename: if self.next_single_test: with open(self.next_single_filename, 'w') as fp: @@ -639,30 +352,27 @@ def finalize(self): else: os.unlink(self.next_single_filename) - if self.tracer: - r = self.tracer.results() - r.write_results(show_missing=True, summary=True, - coverdir=self.ns.coverdir) - - if self.ns.runleaks: + if self.runleaks: os.system("leaks %d" % os.getpid()) - self.save_xml_result() + self.results.write_junit() + + if tracer: + results = tracer.results() + results.write_results(show_missing=True, summary=True, + coverdir=self.coverage_dir) def display_summary(self): - duration = time.perf_counter() - self.start_time - first_runtests = self.all_runtests[0] - # the second runtests (re-run failed tests) disables forever, - # use the first runtests - forever = first_runtests.forever - filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests) + results = self.results + duration = self.logger.get_time() + filtered = (bool(self.match_tests) or bool(self.ignore_tests)) # Total duration print() print("Total duration: %s" % format_duration(duration)) # Total tests - total = self.total_stats + total = results.total_stats text = f'run={total.tests_run:,}' if filtered: text = f"{text} (filtered)" @@ -674,218 +384,174 @@ def display_summary(self): print(f"Total tests: {' '.join(stats)}") # Total test files - all_tests = [self.good, self.bad, self.rerun, - self.skipped, - self.environment_changed, self.run_no_tests] + all_tests = [results.good, results.bad, results.rerun, + results.skipped, + results.environment_changed, results.run_no_tests] run = sum(map(len, all_tests)) text = f'run={run}' - if not forever: - ntest = len(first_runtests.tests) + if not self.forever: + ntest = len(self.first_runtests.tests) text = f"{text}/{ntest}" if filtered: text = f"{text} (filtered)" report = [text] for name, tests in ( - ('failed', self.bad), - ('env_changed', self.environment_changed), - ('skipped', self.skipped), - ('resource_denied', self.resource_denied), - ('rerun', self.rerun), - ('run_no_tests', self.run_no_tests), + ('failed', results.bad), + ('env_changed', results.environment_changed), + ('skipped', results.skipped), + ('resource_denied', results.resource_denied), + ('rerun', results.rerun), + ('run_no_tests', results.run_no_tests), ): if tests: report.append(f'{name}={len(tests)}') print(f"Total test files: {' '.join(report)}") # Result - result = self.get_tests_state() - print(f"Result: {result}") - - def save_xml_result(self): - if not self.ns.xmlpath and not self.testsuite_xml: - return - - import xml.etree.ElementTree as ET - root = ET.Element("testsuites") - - # Manually count the totals for the overall summary - totals = {'tests': 0, 'errors': 0, 'failures': 0} - for suite in self.testsuite_xml: - root.append(suite) - for k in totals: - try: - totals[k] += int(suite.get(k, 0)) - except ValueError: - pass - - for k, v in totals.items(): - root.set(k, str(v)) - - xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath) - with open(xmlpath, 'wb') as f: - for s in ET.tostringlist(root): - f.write(s) - - def fix_umask(self): - if support.is_emscripten: - # Emscripten has default umask 0o777, which breaks some tests. - # see https://github.com/emscripten-core/emscripten/issues/17269 - old_mask = os.umask(0) - if old_mask == 0o777: - os.umask(0o027) - else: - os.umask(old_mask) - - def set_temp_dir(self): - if self.ns.tempdir: - self.tmp_dir = self.ns.tempdir - - if not self.tmp_dir: - # When tests are run from the Python build directory, it is best practice - # to keep the test files in a subfolder. This eases the cleanup of leftover - # files using the "make distclean" command. - if sysconfig.is_python_build(): - self.tmp_dir = sysconfig.get_config_var('abs_builddir') - if self.tmp_dir is None: - # bpo-30284: On Windows, only srcdir is available. Using - # abs_builddir mostly matters on UNIX when building Python - # out of the source tree, especially when the source tree - # is read only. - self.tmp_dir = sysconfig.get_config_var('srcdir') - self.tmp_dir = os.path.join(self.tmp_dir, 'build') - else: - self.tmp_dir = tempfile.gettempdir() + state = self.get_tests_state() + print(f"Result: {state}") - self.tmp_dir = os.path.abspath(self.tmp_dir) + def main(self, tests: TestsList | None = None): + strip_py_suffix(self.cmdline_args) - def is_worker(self): - return (self.ns.worker_args is not None) + self.tmp_dir = select_temp_dir(self.orig_tempdir) + fix_umask() - def create_temp_dir(self): - os.makedirs(self.tmp_dir, exist_ok=True) - - # Define a writable temp dir that will be used as cwd while running - # the tests. The name of the dir includes the pid to allow parallel - # testing (see the -j option). - # Emscripten and WASI have stubbed getpid(), Emscripten has only - # milisecond clock resolution. Use randint() instead. - if sys.platform in {"emscripten", "wasi"}: - nounce = random.randint(0, 1_000_000) - else: - nounce = os.getpid() - - if self.is_worker(): - test_cwd = 'test_python_worker_{}'.format(nounce) - else: - test_cwd = 'test_python_{}'.format(nounce) - test_cwd += os_helper.FS_NONASCII - test_cwd = os.path.join(self.tmp_dir, test_cwd) - return test_cwd - - def cleanup(self): - import glob - - path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') - print("Cleanup %s directory" % self.tmp_dir) - for name in glob.glob(path): - if os.path.isdir(name): - print("Remove directory: %s" % name) - os_helper.rmtree(name) - else: - print("Remove file: %s" % name) - os_helper.unlink(name) - - def main(self, tests=None, **kwargs): - self.parse_args(kwargs) - - self.set_temp_dir() - - self.fix_umask() - - if self.ns.cleanup: - self.cleanup() + if self.want_cleanup: + cleanup_directory(self.tmp_dir) sys.exit(0) - test_cwd = self.create_temp_dir() - try: - # Run the tests in a context manager that temporarily changes the CWD - # to a temporary and writable directory. If it's not possible to - # create or change the CWD, the original CWD will be used. - # The original CWD is available from os_helper.SAVEDCWD. + # Run the tests in a context manager that temporarily changes the + # CWD to a temporary and writable directory. If it's not possible + # to create or change the CWD, the original CWD will be used. The + # original CWD is available from os_helper.SAVEDCWD. + # + # When using multiprocessing, worker processes will use + # test_cwd as their parent temporary directory. So when the + # main process exit, it removes also subdirectories of worker + # processes. + is_worker = (self.worker_json is not None) + test_cwd = make_temp_dir(self.tmp_dir, is_worker) with os_helper.temp_cwd(test_cwd, quiet=True): - # When using multiprocessing, worker processes will use test_cwd - # as their parent temporary directory. So when the main process - # exit, it removes also subdirectories of worker processes. - self.ns.tempdir = test_cwd - - self._main(tests, kwargs) - except SystemExit as exc: + self._main(tests) + finally: # bpo-38203: Python can hang at exit in Py_Finalize(), especially # on threading._shutdown() call: put a timeout if threading_helper.can_start_thread: faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) - sys.exit(exc.code) - - def getloadavg(self): - if self.win_load_tracker is not None: - return self.win_load_tracker.getloadavg() + def get_exitcode(self): + return self.results.get_exitcode( + fail_env_changed=self.fail_env_changed, + fail_rerun=self.fail_rerun) + + def create_run_tests(self, tests): + return RunTests( + tuple(tests), + timeout=self.timeout, + quiet=self.quiet, + fail_env_changed=self.fail_env_changed, + match_tests=self.match_tests, + ignore_tests=self.ignore_tests, + pgo=self.pgo, + pgo_extended=self.pgo_extended, + use_junit=self.results.use_junit(), + test_dir=self.test_dir, + huntrleaks=self.huntrleaks, + memlimit=self.memlimit, + gc_threshold=self.gc_threshold, + use_resources=self.use_resources, + python_executable=self.python_executable, + verbose=self.verbose, + fail_fast=self.fail_fast, + output_on_failure=self.output_on_failure, + forever=self.forever) + + def action_run_tests(self, tests, self_tests): + if self.huntrleaks: + warmup, repetitions, _ = self.huntrleaks + if warmup < 3: + msg = ("WARNING: Running tests with --huntrleaks/-R and less than " + "3 warmup repetitions can give false positives!") + print(msg, file=sys.stdout, flush=True) + print() - if hasattr(os, 'getloadavg'): - return os.getloadavg()[0] + # For a partial run, we do not need to clutter the output. + if (self.want_header + or not(self.pgo or self.quiet or self.single_test_run + or self_tests or self.cmdline_args)): + display_header() + if self.randomize: + print(f"Using random seed: {self.random_seed}") + + if self.num_processes is not None: + if self.pgo: + print("PGO: disable multiprocessing") + self.num_processes = 0 + elif self.num_processes <= 0: + # Use all cores + extras for tests that like to sleep + self.num_processes = (os.cpu_count() or 1) + 2 + else: + self.num_processes = 0 - return None + # run tests + runtests = self.create_run_tests(tests) + tracer = self.run_tests(runtests) + self.first_runtests = runtests + self.display_result(runtests.tests) - def get_exitcode(self): - exitcode = 0 - if self.bad: - exitcode = EXITCODE_BAD_TEST - elif self.interrupted: - exitcode = EXITCODE_INTERRUPTED - elif self.ns.fail_env_changed and self.environment_changed: - exitcode = EXITCODE_ENV_CHANGED - elif self.no_tests_run(): - exitcode = EXITCODE_NO_TESTS_RAN - elif self.rerun and self.ns.fail_rerun: - exitcode = EXITCODE_RERUN_FAIL - return exitcode - - def action_run_tests(self): - self.run_tests() - self.display_result() - - need_rerun = self.need_rerun - if self.ns.rerun and need_rerun: - self.rerun_failed_tests(need_rerun) + # if tests failed, re-run them in verbose mode + if self.want_rerun and self.results.rerun_needed(): + self.rerun_failed_tests(runtests) - self.display_summary() - self.finalize() + self.finalize_tests(tracer) - def _main(self, tests, kwargs): - if self.is_worker(): - from test.libregrtest.runtest_mp import run_tests_worker - run_tests_worker(self.ns.worker_args) + def _main(self, tests: TestsList | None): + worker_json = self.worker_json + if worker_json is not None: + from .worker import worker_process + worker_process(worker_json) return - if self.ns.wait: + if self.want_wait: input("Press any key to continue...") - setup_tests(self.ns) - self.find_tests(tests) + if self.randomize: + if self.random_seed is None: + self.random_seed = random.randrange(100_000_000) + random.seed(self.random_seed) + + setup_test_dir(self.test_dir) + selected, tests = self.find_tests(tests, self.test_dir) + + if self.randomize: + selected = list(selected) + random.shuffle(selected) + selected = tuple(selected) exitcode = 0 - if self.ns.list_tests: - self.list_tests() - elif self.ns.list_cases: - self.list_cases() + if self.want_list_tests: + for name in selected: + print(name) + elif self.want_list_cases: + list_cases( + selected, + test_dir=self.test_dir, + match_tests=self.match_tests, + ignore_tests=self.ignore_tests) else: - self.action_run_tests() - exitcode = self.get_exitcode() + self.logger.start_load_tracker() + try: + self.action_run_tests(selected, tests) + finally: + self.logger.stop_load_tracker() + exitcode = self.get_exitcode() sys.exit(exitcode) -def main(tests=None, **kwargs): +def main(tests: TestsList | None = None, **kwargs): """Run the Python suite.""" - Regrtest().main(tests=tests, **kwargs) + ns = _parse_args(sys.argv[1:], **kwargs) + Regrtest(ns).main(tests) diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/mp_runner.py similarity index 69% rename from Lib/test/libregrtest/runtest_mp.py rename to Lib/test/libregrtest/mp_runner.py index 60089554cab5dd..50a0b7786f9f47 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/mp_runner.py @@ -1,6 +1,6 @@ import dataclasses import faulthandler -import json +import locale import os.path import queue import signal @@ -10,22 +10,15 @@ import threading import time import traceback -from typing import NamedTuple, NoReturn, Literal, Any, TextIO +from typing import Literal, TextIO from test import support from test.support import os_helper -from test.support import TestStats -from test.libregrtest.cmdline import Namespace -from test.libregrtest.main import Regrtest -from test.libregrtest.runtest import ( - runtest, TestResult, State, PROGRESS_MIN_TIME, - MatchTests, RunTests) -from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration, print_warning - -if sys.platform == 'win32': - import locale +from .main import Logger, Results +from .single import TestResult, State, PROGRESS_MIN_TIME, RunTests +from .utils import format_duration, print_warning +from .worker import create_worker_process, USE_PROCESS_GROUP # Display the running tests if nothing happened last N seconds @@ -41,114 +34,6 @@ # Time to wait until a worker completes: should be immediate JOIN_TIMEOUT = 30.0 # seconds -USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) - - -@dataclasses.dataclass(slots=True) -class WorkerJob: - test_name: str - namespace: Namespace - rerun: bool = False - match_tests: MatchTests | None = None - - -class _EncodeWorkerJob(json.JSONEncoder): - def default(self, o: Any) -> dict[str, Any]: - match o: - case WorkerJob(): - result = dataclasses.asdict(o) - result["__worker_job__"] = True - return result - case Namespace(): - result = vars(o) - result["__namespace__"] = True - return result - case _: - return super().default(o) - - -def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]: - if "__worker_job__" in d: - d.pop('__worker_job__') - return WorkerJob(**d) - if "__namespace__" in d: - d.pop('__namespace__') - return Namespace(**d) - else: - return d - - -def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]: - return json.loads(worker_json, - object_hook=_decode_worker_job) - - -def run_test_in_subprocess(worker_job: WorkerJob, - output_file: TextIO, - tmp_dir: str | None = None) -> subprocess.Popen: - ns = worker_job.namespace - python = ns.python - worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob) - - if python is not None: - executable = python - else: - executable = [sys.executable] - cmd = [*executable, *support.args_from_interpreter_flags(), - '-u', # Unbuffered stdout and stderr - '-m', 'test.regrtest', - '--worker-args', worker_args] - - env = dict(os.environ) - if tmp_dir is not None: - env['TMPDIR'] = tmp_dir - env['TEMP'] = tmp_dir - env['TMP'] = tmp_dir - - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - kw = dict( - env=env, - stdout=output_file, - # bpo-45410: Write stderr into stdout to keep messages order - stderr=output_file, - text=True, - close_fds=(os.name != 'nt'), - cwd=os_helper.SAVEDCWD, - ) - if USE_PROCESS_GROUP: - kw['start_new_session'] = True - return subprocess.Popen(cmd, **kw) - - -def run_tests_worker(worker_json: str) -> NoReturn: - worker_job = _parse_worker_args(worker_json) - ns = worker_job.namespace - test_name = worker_job.test_name - rerun = worker_job.rerun - match_tests = worker_job.match_tests - - setup_tests(ns) - - if rerun: - if match_tests: - matching = "matching: " + ", ".join(match_tests) - print(f"Re-running {test_name} in verbose mode ({matching})", flush=True) - else: - print(f"Re-running {test_name} in verbose mode", flush=True) - ns.verbose = True - - if match_tests is not None: - ns.match_tests = match_tests - - result = runtest(ns, test_name) - print() # Force a newline (just in case) - - # Serialize TestResult as dict in JSON - print(json.dumps(result, cls=EncodeTestResult), flush=True) - sys.exit(0) - # We do not use a generator so multiple threads can call next(). class MultiprocessIterator: @@ -173,7 +58,8 @@ def stop(self): self.tests_iter = None -class MultiprocessResult(NamedTuple): +@dataclasses.dataclass(slots=True, frozen=True) +class MultiprocessResult: result: TestResult # bpo-45410: stderr is written into stdout to keep messages order worker_stdout: str | None = None @@ -195,10 +81,8 @@ def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: self.runtests = runner.runtests self.pending = runner.pending self.output = runner.output - self.ns = runner.ns self.timeout = runner.worker_timeout - self.regrtest = runner.regrtest - self.rerun = runner.rerun + self.log = runner.log self.current_test_name = None self.start_time = None self._popen = None @@ -262,11 +146,10 @@ def mp_result_error( ) -> MultiprocessResult: return MultiprocessResult(test_result, stdout, err_msg) - def _run_process(self, worker_job, output_file: TextIO, + def _run_process(self, runtests: RunTests, output_file: TextIO, tmp_dir: str | None = None) -> int: - self.current_test_name = worker_job.test_name try: - popen = run_test_in_subprocess(worker_job, output_file, tmp_dir) + popen = create_worker_process(runtests, output_file, tmp_dir) self._killed = False self._popen = popen @@ -316,6 +199,8 @@ def _run_process(self, worker_job, output_file: TextIO, self.current_test_name = None def _runtest(self, test_name: str) -> MultiprocessResult: + self.current_test_name = test_name + if sys.platform == 'win32': # gh-95027: When stdout is not a TTY, Python uses the ANSI code # page for the sys.stdout encoding. If the main process runs in a @@ -324,18 +209,18 @@ def _runtest(self, test_name: str) -> MultiprocessResult: else: encoding = sys.stdout.encoding - match_tests = self.runtests.get_match_tests(test_name) + tests = (test_name,) + kwargs = {} + if self.runtests.rerun: + kwargs['match_tests'] = self.runtests.get_match_tests(test_name) + worker_runtests = self.runtests.copy(tests=tests, **kwargs) # gh-94026: Write stdout+stderr to a tempfile as workaround for # non-blocking pipes on Emscripten with NodeJS. with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file: - worker_job = WorkerJob(test_name, - namespace=self.ns, - rerun=self.rerun, - match_tests=match_tests) # gh-93353: Check for leaked temporary files in the parent process, # since the deletion of temporary files can happen late during - # Python finalization: too late for libregrtest. + # Python finalization: too late for regrtest. if not support.is_wasi: # Don't check for leaked temporary files and directories if Python is # run on WASI. WASI don't pass environment variables like TMPDIR to @@ -343,12 +228,12 @@ def _runtest(self, test_name: str) -> MultiprocessResult: tmp_dir = tempfile.mkdtemp(prefix="test_python_") tmp_dir = os.path.abspath(tmp_dir) try: - retcode = self._run_process(worker_job, stdout_file, tmp_dir) + retcode = self._run_process(worker_runtests, stdout_file, tmp_dir) finally: tmp_files = os.listdir(tmp_dir) os_helper.rmtree(tmp_dir) else: - retcode = self._run_process(worker_job, stdout_file) + retcode = self._run_process(worker_runtests, stdout_file) tmp_files = () stdout_file.seek(0) @@ -376,8 +261,7 @@ def _runtest(self, test_name: str) -> MultiprocessResult: else: try: # deserialize run_tests_worker() output - result = json.loads(worker_json, - object_hook=decode_test_result) + result = TestResult.from_json(worker_json) except Exception as exc: err_msg = "Failed to parse worker JSON: %s" % exc @@ -395,8 +279,8 @@ def _runtest(self, test_name: str) -> MultiprocessResult: return MultiprocessResult(result, stdout) def run(self) -> None: - fail_fast = self.ns.failfast - fail_env_changed = self.ns.fail_env_changed + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed while not self._stopped: try: try: @@ -444,8 +328,7 @@ def wait_stopped(self, start_time: float) -> None: if not self.is_alive(): break dt = time.monotonic() - start_time - self.regrtest.log(f"Waiting for {self} thread " - f"for {format_duration(dt)}") + self.log(f"Waiting for {self} thread for {format_duration(dt)}") if dt > JOIN_TIMEOUT: print_warning(f"Failed to join {self} in {format_duration(dt)}") break @@ -465,18 +348,18 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: class MultiprocessTestRunner: - def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: - ns = regrtest.ns - timeout = ns.timeout - - self.regrtest = regrtest + def __init__(self, results: Results, runtests: RunTests, logger: Logger, + num_processes: int) -> None: + self.results = results self.runtests = runtests - self.rerun = runtests.rerun - self.log = self.regrtest.log - self.ns = ns + self.log = logger.log + self._display_progress = logger.display_progress + self.num_processes = num_processes + self.output: queue.Queue[QueueOutput] = queue.Queue() tests_iter = runtests.iter_tests() self.pending = MultiprocessIterator(tests_iter) + timeout = runtests.timeout if timeout is not None: # Rely on faulthandler to kill a worker process. This timouet is # when faulthandler fails to kill a worker process. Give a maximum @@ -486,11 +369,14 @@ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: self.worker_timeout = None self.workers = None + def display_progress(self, test_index, test_name): + self._display_progress(test_index, test_name, + self.results, self.runtests) + def start_workers(self) -> None: - use_mp = self.ns.use_mp - timeout = self.ns.timeout + timeout = self.runtests.timeout self.workers = [TestWorkerProcess(index, self) - for index in range(1, use_mp + 1)] + for index in range(1, self.num_processes + 1)] msg = f"Run tests in parallel using {len(self.workers)} child processes" if timeout: msg += (" (timeout: %s, worker timeout: %s)" @@ -508,8 +394,8 @@ def stop_workers(self) -> None: worker.wait_stopped(start_time) def _get_result(self) -> QueueOutput | None: - pgo = self.ns.pgo - use_faulthandler = (self.ns.timeout is not None) + pgo = self.runtests.pgo + use_faulthandler = (self.runtests.timeout is not None) timeout = PROGRESS_UPDATE # bpo-46205: check the status of workers every iteration to avoid @@ -537,8 +423,9 @@ def _get_result(self) -> QueueOutput | None: return None def display_result(self, mp_result: MultiprocessResult) -> None: + runtests = self.runtests result = mp_result.result - pgo = self.ns.pgo + pgo = runtests.pgo text = str(result) if mp_result.err_msg: @@ -549,23 +436,23 @@ def display_result(self, mp_result: MultiprocessResult) -> None: running = get_running(self.workers) if running and not pgo: text += ' -- running: %s' % ', '.join(running) - self.regrtest.display_progress(self.test_index, text) + self.display_progress(self.test_index, text) def _process_result(self, item: QueueOutput) -> bool: """Returns True if test runner must stop.""" - rerun = self.runtests.rerun + results = self.results if item[0]: # Thread got an exception format_exc = item[1] print_warning(f"regrtest worker thread failed: {format_exc}") result = TestResult("", state=State.MULTIPROCESSING_ERROR) - self.regrtest.accumulate_result(result, rerun=rerun) + results.accumulate_result(result, self.runtests) return result self.test_index += 1 mp_result = item[1] result = mp_result.result - self.regrtest.accumulate_result(result, rerun=rerun) + results.accumulate_result(result, self.runtests) self.display_result(mp_result) if mp_result.worker_stdout: @@ -574,9 +461,10 @@ def _process_result(self, item: QueueOutput) -> bool: return result def run_tests(self) -> None: - fail_fast = self.ns.failfast - fail_env_changed = self.ns.fail_env_changed - timeout = self.ns.timeout + results = self.results + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + timeout = self.runtests.timeout self.start_workers() @@ -592,7 +480,7 @@ def run_tests(self) -> None: break except KeyboardInterrupt: print() - self.regrtest.interrupted = True + results.interrupted = True finally: if timeout is not None: faulthandler.cancel_dump_traceback_later() @@ -601,31 +489,3 @@ def run_tests(self) -> None: # worker when we exit this function self.pending.stop() self.stop_workers() - - -def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None: - MultiprocessTestRunner(regrtest, runtests).run_tests() - - -class EncodeTestResult(json.JSONEncoder): - """Encode a TestResult (sub)class object into a JSON dict.""" - - def default(self, o: Any) -> dict[str, Any]: - if isinstance(o, TestResult): - result = dataclasses.asdict(o) - result["__test_result__"] = o.__class__.__name__ - return result - - return super().default(o) - - -def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]: - """Decode a TestResult (sub)class object from a JSON dict.""" - - if "__test_result__" not in d: - return d - - d.pop('__test_result__') - if d['stats'] is not None: - d['stats'] = TestStats(**d['stats']) - return TestResult(**d) diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py index 42ce5fba7a97c3..cd47b15ca1fcff 100644 --- a/Lib/test/libregrtest/pgo.py +++ b/Lib/test/libregrtest/pgo.py @@ -49,8 +49,3 @@ 'test_xml_etree', 'test_xml_etree_c', ] - -def setup_pgo_tests(ns): - if not ns.args and not ns.pgo_extended: - # run default set of tests for PGO training - ns.args = PGO_TESTS[:] diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py index 206802b60ddcd0..dcd1491c68f89b 100644 --- a/Lib/test/libregrtest/refleak.py +++ b/Lib/test/libregrtest/refleak.py @@ -4,7 +4,8 @@ from inspect import isabstract from test import support from test.support import os_helper -from test.libregrtest.utils import clear_caches + +from .utils import clear_caches try: from _abc import _get_dump @@ -19,7 +20,9 @@ def _get_dump(cls): cls._abc_negative_cache, cls._abc_negative_cache_version) -def dash_R(ns, test_name, test_func): +def runtest_refleak(test_name: str, test_func, + huntrleaks: tuple[int, int, str], + quiet: bool): """Run a test multiple times, looking for reference leaks. Returns: @@ -62,7 +65,7 @@ def dash_R(ns, test_name, test_func): def get_pooled_int(value): return int_pool.setdefault(value, value) - nwarmup, ntracked, fname = ns.huntrleaks + nwarmup, ntracked, fname = huntrleaks fname = os.path.join(os_helper.SAVEDCWD, fname) repcount = nwarmup + ntracked @@ -78,7 +81,7 @@ def get_pooled_int(value): # initialize variables to make pyflakes quiet rc_before = alloc_before = fd_before = interned_before = 0 - if not ns.quiet: + if not quiet: print("beginning", repcount, "repetitions", file=sys.stderr) print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr, flush=True) @@ -102,7 +105,7 @@ def get_pooled_int(value): rc_after = gettotalrefcount() - interned_after * 2 fd_after = fd_count() - if not ns.quiet: + if not quiet: print('.', end='', file=sys.stderr, flush=True) rc_deltas[i] = get_pooled_int(rc_after - rc_before) @@ -114,7 +117,7 @@ def get_pooled_int(value): fd_before = fd_after interned_before = interned_after - if not ns.quiet: + if not quiet: print(file=sys.stderr) # These checkers return False on success, True on failure diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py new file mode 100644 index 00000000000000..01b2458efb5de3 --- /dev/null +++ b/Lib/test/libregrtest/result.py @@ -0,0 +1,185 @@ +import dataclasses +import json +from typing import Any + +from . import TestStats +from .utils import format_duration, normalize_test_name, print_warning + + +# --match and --ignore options: list of patterns +# ('*' joker character can be used) +FilterTuple = tuple[str, ...] + + +# Avoid enum.Enum to reduce the number of imports when tests are run +class State: + PASSED = "PASSED" + FAILED = "FAILED" + SKIPPED = "SKIPPED" + UNCAUGHT_EXC = "UNCAUGHT_EXC" + REFLEAK = "REFLEAK" + ENV_CHANGED = "ENV_CHANGED" + RESOURCE_DENIED = "RESOURCE_DENIED" + INTERRUPTED = "INTERRUPTED" + MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR" + DID_NOT_RUN = "DID_NOT_RUN" + TIMEOUT = "TIMEOUT" + + @staticmethod + def is_failed(state): + return state in { + State.FAILED, + State.UNCAUGHT_EXC, + State.REFLEAK, + State.MULTIPROCESSING_ERROR, + State.TIMEOUT} + + @staticmethod + def has_meaningful_duration(state): + # Consider that the duration is meaningless for these cases. + # For example, if a whole test file is skipped, its duration + # is unlikely to be the duration of executing its tests, + # but just the duration to execute code which skips the test. + return state not in { + State.SKIPPED, + State.RESOURCE_DENIED, + State.INTERRUPTED, + State.MULTIPROCESSING_ERROR, + State.DID_NOT_RUN} + + @staticmethod + def must_stop(state): + return state in { + State.INTERRUPTED, + State.MULTIPROCESSING_ERROR} + + +@dataclasses.dataclass(slots=True) +class TestResult: + test_name: str + state: str | None = None + # Test duration in seconds + duration: float | None = None + xml_data: list[str] | None = None + stats: TestStats | None = None + + # errors and failures copied from support.TestFailedWithDetails + errors: list[tuple[str, str]] | None = None + failures: list[tuple[str, str]] | None = None + + def is_failed(self, fail_env_changed: bool) -> bool: + if self.state == State.ENV_CHANGED: + return fail_env_changed + return State.is_failed(self.state) + + def _format_failed(self): + if self.errors and self.failures: + le = len(self.errors) + lf = len(self.failures) + error_s = "error" + ("s" if le > 1 else "") + failure_s = "failure" + ("s" if lf > 1 else "") + return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" + + if self.errors: + le = len(self.errors) + error_s = "error" + ("s" if le > 1 else "") + return f"{self.test_name} failed ({le} {error_s})" + + if self.failures: + lf = len(self.failures) + failure_s = "failure" + ("s" if lf > 1 else "") + return f"{self.test_name} failed ({lf} {failure_s})" + + return f"{self.test_name} failed" + + def __str__(self) -> str: + match self.state: + case State.PASSED: + return f"{self.test_name} passed" + case State.FAILED: + return self._format_failed() + case State.SKIPPED: + return f"{self.test_name} skipped" + case State.UNCAUGHT_EXC: + return f"{self.test_name} failed (uncaught exception)" + case State.REFLEAK: + return f"{self.test_name} failed (reference leak)" + case State.ENV_CHANGED: + return f"{self.test_name} failed (env changed)" + case State.RESOURCE_DENIED: + return f"{self.test_name} skipped (resource denied)" + case State.INTERRUPTED: + return f"{self.test_name} interrupted" + case State.MULTIPROCESSING_ERROR: + return f"{self.test_name} process crashed" + case State.DID_NOT_RUN: + return f"{self.test_name} ran no tests" + case State.TIMEOUT: + return f"{self.test_name} timed out ({format_duration(self.duration)})" + case _: + raise ValueError("unknown result state: {state!r}") + + def has_meaningful_duration(self): + return State.has_meaningful_duration(self.state) + + def set_env_changed(self): + if self.state is None or self.state == State.PASSED: + self.state = State.ENV_CHANGED + + def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: + if State.must_stop(self.state): + return True + if fail_fast and self.is_failed(fail_env_changed): + return True + return False + + def get_rerun_match_tests(self) -> FilterTuple | None: + match_tests = [] + + errors = self.errors or [] + failures = self.failures or [] + for error_list, is_error in ( + (errors, True), + (failures, False), + ): + for full_name, *_ in error_list: + match_name = normalize_test_name(full_name, is_error=is_error) + if match_name is None: + # 'setUpModule (test.test_sys)': don't filter tests + return None + if not match_name: + error_type = "ERROR" if is_error else "FAIL" + print_warning(f"rerun failed to parse {error_type} test name: " + f"{full_name!r}: don't filter tests") + return None + match_tests.append(match_name) + + return tuple(match_tests) + + def as_json_into(self, file): + json.dump(self, file, cls=_EncodeTestResult) + + @staticmethod + def from_json(worker_json): + return json.loads(worker_json, + object_hook=_decode_test_result) + + +class _EncodeTestResult(json.JSONEncoder): + def default(self, o: Any) -> dict[str, Any]: + if isinstance(o, TestResult): + result = dataclasses.asdict(o) + result["__test_result__"] = o.__class__.__name__ + return result + else: + return super().default(o) + + +def _decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]: + if "__test_result__" in d: + d.pop('__test_result__') + if d['stats'] is not None: + d['stats'] = TestStats(**d['stats']) + return TestResult(**d) + else: + return d diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py new file mode 100644 index 00000000000000..b65381d7346b40 --- /dev/null +++ b/Lib/test/libregrtest/results.py @@ -0,0 +1,248 @@ +import os + +from test.support import os_helper + +from . import State, TestResult, TestStats, TestsTuple, FilterDict +from .utils import count, printlist, format_duration, print_warning + + +TestsList = list[str] + +EXITCODE_BAD_TEST = 2 +EXITCODE_INTERRUPTED = 130 +EXITCODE_ENV_CHANGED = 3 +EXITCODE_NO_TESTS_RAN = 4 +EXITCODE_RERUN_FAIL = 5 + + +class Results: + def __init__(self, xmlpath: str | None = None): + self.bad: TestsList = [] + self.good: TestsList = [] + self.rerun_bad: TestsList = [] + self.skipped: TestsList = [] + self.resource_denied: TestsList = [] + self.environment_changed: TestsList = [] + self.run_no_tests: TestsList = [] + self.rerun: TestsList = [] + + self.need_rerun: list[TestResult] = [] + self.interrupted = False + self.total_stats = TestStats() + + # used by --slow + self.test_times = [] + + # used by --junit-xml + self.xmlpath = xmlpath + if self.xmlpath: + self.testsuite_xml = [] + else: + self.testsuite_xml = None + + def get_executed(self): + return (set(self.good) | set(self.bad) | set(self.skipped) + | set(self.resource_denied) | set(self.environment_changed) + | set(self.run_no_tests)) + + def no_tests_run(self): + return not any((self.good, self.bad, self.skipped, self.interrupted, + self.environment_changed)) + + def rerun_needed(self): + return bool(self.need_rerun) + + @staticmethod + def get_rerun_match(rerun_list) -> FilterDict: + rerun_match_tests: FilterDict = {} + for result in rerun_list: + match_tests = result.get_rerun_match_tests() + # ignore empty match list + if match_tests: + rerun_match_tests[result.test_name] = match_tests + return rerun_match_tests + + def prepare_rerun(self) -> tuple[TestsTuple, FilterDict]: + tests = tuple(result.test_name for result in self.need_rerun) + match_tests_dict = self.get_rerun_match(self.need_rerun) + + # Clear previously failed tests + self.rerun_bad.extend(self.bad) + self.bad.clear() + self.need_rerun.clear() + + return (tests, match_tests_dict) + + def add_junit(self, xml_data: str): + import xml.etree.ElementTree as ET + for item in xml_data: + try: + node = ET.fromstring(item) + self.testsuite_xml.append(node) + except ET.ParseError: + print_warning("failed to parse XML: {xml_data!r}") + raise + + def accumulate_result(self, result, runtests): + fail_env_changed = runtests.fail_env_changed + rerun = runtests.rerun + test_name = result.test_name + + match result.state: + case State.PASSED: + self.good.append(test_name) + case State.ENV_CHANGED: + self.environment_changed.append(test_name) + case State.SKIPPED: + self.skipped.append(test_name) + case State.RESOURCE_DENIED: + self.resource_denied.append(test_name) + case State.INTERRUPTED: + self.interrupted = True + case State.DID_NOT_RUN: + self.run_no_tests.append(test_name) + case _: + if result.is_failed(fail_env_changed): + self.bad.append(test_name) + self.need_rerun.append(result) + else: + raise ValueError(f"invalid test state: {result.state!r}") + + if result.has_meaningful_duration() and not rerun: + self.test_times.append((result.duration, test_name)) + if result.stats is not None: + self.total_stats.accumulate(result.stats) + if rerun: + self.rerun.append(test_name) + + if result.xml_data: + self.add_junit(result.xml_data) + + def use_junit(self): + return (self.xmlpath is not None) + + def write_junit(self): + if not self.use_junit(): + return + if not self.testsuite_xml: + # Don't write empty JUnit file + return + + import xml.etree.ElementTree as ET + root = ET.Element("testsuites") + + # Manually count the totals for the overall summary + totals = {'tests': 0, 'errors': 0, 'failures': 0} + for suite in self.testsuite_xml: + root.append(suite) + for k in totals: + try: + totals[k] += int(suite.get(k, 0)) + except ValueError: + pass + + for k, v in totals.items(): + root.set(k, str(v)) + + xmlpath = os.path.join(os_helper.SAVEDCWD, self.xmlpath) + with open(xmlpath, 'wb') as f: + for s in ET.tostringlist(root): + f.write(s) + + def get_exitcode(self, *, fail_env_changed, fail_rerun): + exitcode = 0 + if self.bad: + exitcode = EXITCODE_BAD_TEST + elif self.interrupted: + exitcode = EXITCODE_INTERRUPTED + elif fail_env_changed and self.environment_changed: + exitcode = EXITCODE_ENV_CHANGED + elif self.no_tests_run(): + exitcode = EXITCODE_NO_TESTS_RAN + elif self.rerun and fail_rerun: + exitcode = EXITCODE_RERUN_FAIL + return exitcode + + def get_state(self, *, fail_env_changed: bool, first_state: bool): + result = [] + if self.bad: + result.append("FAILURE") + elif fail_env_changed and self.environment_changed: + result.append("ENV CHANGED") + elif self.no_tests_run(): + result.append("NO TESTS RAN") + + if self.interrupted: + result.append("INTERRUPTED") + + if not result: + result.append("SUCCESS") + + result = ', '.join(result) + if first_state: + result = '%s then %s' % (first_state, result) + return result + + def display_result(self, selected, *, quiet: bool, print_slowest: bool, + fail_env_changed: bool, first_state: bool): + print() + state = self.get_state( + fail_env_changed=fail_env_changed, + first_state=first_state) + print(f"== Tests result: {state} ==") + + if self.interrupted: + print("Test suite interrupted by signal SIGINT.") + + omitted = set(selected) - self.get_executed() + if omitted: + print() + print(count(len(omitted), "test"), "omitted:") + printlist(omitted) + + if self.good and not quiet: + print() + if (not self.bad + and not self.skipped + and not self.interrupted + and len(self.good) > 1): + print("All", end=' ') + print(count(len(self.good), "test"), "OK.") + + if print_slowest: + self.test_times.sort(reverse=True) + print() + print("10 slowest tests:") + for test_time, test in self.test_times[:10]: + print("- %s: %s" % (test, format_duration(test_time))) + + if self.bad: + print() + print(count(len(self.bad), "test"), "failed:") + printlist(self.bad) + + if self.environment_changed: + print() + print("{} altered the execution environment:".format( + count(len(self.environment_changed), "test"))) + printlist(self.environment_changed) + + if self.skipped and not quiet: + print() + print(count(len(self.skipped), "test"), "skipped:") + printlist(self.skipped) + + if self.resource_denied and not quiet: + print() + print(count(len(self.resource_denied), "test"), "skipped (resource denied):") + printlist(self.resource_denied) + + if self.rerun: + print() + print("%s:" % count(len(self.rerun), "re-run test")) + printlist(self.rerun) + + if self.run_no_tests: + print() + print(count(len(self.run_no_tests), "test"), "run no tests:") + printlist(self.run_no_tests) diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py deleted file mode 100644 index 16ae04191da768..00000000000000 --- a/Lib/test/libregrtest/runtest.py +++ /dev/null @@ -1,575 +0,0 @@ -import dataclasses -import doctest -import faulthandler -import gc -import importlib -import io -import os -import sys -import time -import traceback -import unittest - -from test import support -from test.support import TestStats -from test.support import os_helper -from test.support import threading_helper -from test.libregrtest.cmdline import Namespace -from test.libregrtest.save_env import saved_test_environment -from test.libregrtest.utils import clear_caches, format_duration, print_warning - - -MatchTests = list[str] -MatchTestsDict = dict[str, MatchTests] - - -# Avoid enum.Enum to reduce the number of imports when tests are run -class State: - PASSED = "PASSED" - FAILED = "FAILED" - SKIPPED = "SKIPPED" - UNCAUGHT_EXC = "UNCAUGHT_EXC" - REFLEAK = "REFLEAK" - ENV_CHANGED = "ENV_CHANGED" - RESOURCE_DENIED = "RESOURCE_DENIED" - INTERRUPTED = "INTERRUPTED" - MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR" - DID_NOT_RUN = "DID_NOT_RUN" - TIMEOUT = "TIMEOUT" - - @staticmethod - def is_failed(state): - return state in { - State.FAILED, - State.UNCAUGHT_EXC, - State.REFLEAK, - State.MULTIPROCESSING_ERROR, - State.TIMEOUT} - - @staticmethod - def has_meaningful_duration(state): - # Consider that the duration is meaningless for these cases. - # For example, if a whole test file is skipped, its duration - # is unlikely to be the duration of executing its tests, - # but just the duration to execute code which skips the test. - return state not in { - State.SKIPPED, - State.RESOURCE_DENIED, - State.INTERRUPTED, - State.MULTIPROCESSING_ERROR, - State.DID_NOT_RUN} - - @staticmethod - def must_stop(state): - return state in { - State.INTERRUPTED, - State.MULTIPROCESSING_ERROR} - - -# gh-90681: When rerunning tests, we might need to rerun the whole -# class or module suite if some its life-cycle hooks fail. -# Test level hooks are not affected. -_TEST_LIFECYCLE_HOOKS = frozenset(( - 'setUpClass', 'tearDownClass', - 'setUpModule', 'tearDownModule', -)) - -def normalize_test_name(test_full_name, *, is_error=False): - short_name = test_full_name.split(" ")[0] - if is_error and short_name in _TEST_LIFECYCLE_HOOKS: - if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): - # if setUpModule() or tearDownModule() failed, don't filter - # tests with the test file name, don't use use filters. - return None - - # This means that we have a failure in a life-cycle hook, - # we need to rerun the whole module or class suite. - # Basically the error looks like this: - # ERROR: setUpClass (test.test_reg_ex.RegTest) - # or - # ERROR: setUpModule (test.test_reg_ex) - # So, we need to parse the class / module name. - lpar = test_full_name.index('(') - rpar = test_full_name.index(')') - return test_full_name[lpar + 1: rpar].split('.')[-1] - return short_name - - -@dataclasses.dataclass(slots=True) -class TestResult: - test_name: str - state: str | None = None - # Test duration in seconds - duration: float | None = None - xml_data: list[str] | None = None - stats: TestStats | None = None - - # errors and failures copied from support.TestFailedWithDetails - errors: list[tuple[str, str]] | None = None - failures: list[tuple[str, str]] | None = None - - def is_failed(self, fail_env_changed: bool) -> bool: - if self.state == State.ENV_CHANGED: - return fail_env_changed - return State.is_failed(self.state) - - def _format_failed(self): - if self.errors and self.failures: - le = len(self.errors) - lf = len(self.failures) - error_s = "error" + ("s" if le > 1 else "") - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" - - if self.errors: - le = len(self.errors) - error_s = "error" + ("s" if le > 1 else "") - return f"{self.test_name} failed ({le} {error_s})" - - if self.failures: - lf = len(self.failures) - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({lf} {failure_s})" - - return f"{self.test_name} failed" - - def __str__(self) -> str: - match self.state: - case State.PASSED: - return f"{self.test_name} passed" - case State.FAILED: - return self._format_failed() - case State.SKIPPED: - return f"{self.test_name} skipped" - case State.UNCAUGHT_EXC: - return f"{self.test_name} failed (uncaught exception)" - case State.REFLEAK: - return f"{self.test_name} failed (reference leak)" - case State.ENV_CHANGED: - return f"{self.test_name} failed (env changed)" - case State.RESOURCE_DENIED: - return f"{self.test_name} skipped (resource denied)" - case State.INTERRUPTED: - return f"{self.test_name} interrupted" - case State.MULTIPROCESSING_ERROR: - return f"{self.test_name} process crashed" - case State.DID_NOT_RUN: - return f"{self.test_name} ran no tests" - case State.TIMEOUT: - return f"{self.test_name} timed out ({format_duration(self.duration)})" - case _: - raise ValueError("unknown result state: {state!r}") - - def has_meaningful_duration(self): - return State.has_meaningful_duration(self.state) - - def set_env_changed(self): - if self.state is None or self.state == State.PASSED: - self.state = State.ENV_CHANGED - - def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: - if State.must_stop(self.state): - return True - if fail_fast and self.is_failed(fail_env_changed): - return True - return False - - def get_rerun_match_tests(self): - match_tests = [] - - errors = self.errors or [] - failures = self.failures or [] - for error_list, is_error in ( - (errors, True), - (failures, False), - ): - for full_name, *_ in error_list: - match_name = normalize_test_name(full_name, is_error=is_error) - if match_name is None: - # 'setUpModule (test.test_sys)': don't filter tests - return None - if not match_name: - error_type = "ERROR" if is_error else "FAIL" - print_warning(f"rerun failed to parse {error_type} test name: " - f"{full_name!r}: don't filter tests") - return None - match_tests.append(match_name) - - return match_tests - - -@dataclasses.dataclass(slots=True, frozen=True) -class RunTests: - tests: list[str] - match_tests: MatchTestsDict | None = None - rerun: bool = False - forever: bool = False - - def get_match_tests(self, test_name) -> MatchTests | None: - if self.match_tests is not None: - return self.match_tests.get(test_name, None) - else: - return None - - def iter_tests(self): - tests = tuple(self.tests) - if self.forever: - while True: - yield from tests - else: - yield from tests - - -# Minimum duration of a test to display its duration or to mention that -# the test is running in background -PROGRESS_MIN_TIME = 30.0 # seconds - -#If these test directories are encountered recurse into them and treat each -# test_ .py or dir as a separate test module. This can increase parallelism. -# Beware this can't generally be done for any directory with sub-tests as the -# __init__.py may do things which alter what tests are to be run. - -SPLITTESTDIRS = { - "test_asyncio", - "test_concurrent_futures", - "test_multiprocessing_fork", - "test_multiprocessing_forkserver", - "test_multiprocessing_spawn", -} - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir - - -def findtests(*, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS, base_mod=""): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - tests = [] - for name in os.listdir(testdir): - mod, ext = os.path.splitext(name) - if (not mod.startswith("test_")) or (mod in exclude): - continue - if mod in split_test_dirs: - subdir = os.path.join(testdir, mod) - mod = f"{base_mod or 'test'}.{mod}" - tests.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, base_mod=mod)) - elif ext in (".py", ""): - tests.append(f"{base_mod}.{mod}" if base_mod else mod) - return sorted(tests) - - -def split_test_packages(tests, *, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS): - testdir = findtestdir(testdir) - splitted = [] - for name in tests: - if name in split_test_dirs: - subdir = os.path.join(testdir, name) - splitted.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, - base_mod=name)) - else: - splitted.append(name) - return splitted - - -def abs_module_name(test_name: str, test_dir: str | None) -> str: - if test_name.startswith('test.') or test_dir: - return test_name - else: - # Import it from the test package - return 'test.' + test_name - - -def setup_support(ns: Namespace): - support.PGO = ns.pgo - support.PGO_EXTENDED = ns.pgo_extended - support.set_match_tests(ns.match_tests, ns.ignore_tests) - support.failfast = ns.failfast - support.verbose = ns.verbose - if ns.xmlpath: - support.junit_xml_list = [] - else: - support.junit_xml_list = None - - -def _runtest(result: TestResult, ns: Namespace) -> None: - # Capture stdout and stderr, set faulthandler timeout, - # and create JUnit XML report. - verbose = ns.verbose - output_on_failure = ns.verbose3 - timeout = ns.timeout - - use_timeout = ( - timeout is not None and threading_helper.can_start_thread - ) - if use_timeout: - faulthandler.dump_traceback_later(timeout, exit=True) - - try: - setup_support(ns) - - if output_on_failure: - support.verbose = True - - stream = io.StringIO() - orig_stdout = sys.stdout - orig_stderr = sys.stderr - print_warning = support.print_warning - orig_print_warnings_stderr = print_warning.orig_stderr - - output = None - try: - sys.stdout = stream - sys.stderr = stream - # print_warning() writes into the temporary stream to preserve - # messages order. If support.environment_altered becomes true, - # warnings will be written to sys.stderr below. - print_warning.orig_stderr = stream - - _runtest_env_changed_exc(result, ns, display_failure=False) - # Ignore output if the test passed successfully - if result.state != State.PASSED: - output = stream.getvalue() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - print_warning.orig_stderr = orig_print_warnings_stderr - - if output is not None: - sys.stderr.write(output) - sys.stderr.flush() - else: - # Tell tests to be moderately quiet - support.verbose = verbose - _runtest_env_changed_exc(result, ns, display_failure=not verbose) - - xml_list = support.junit_xml_list - if xml_list: - import xml.etree.ElementTree as ET - result.xml_data = [ET.tostring(x).decode('us-ascii') - for x in xml_list] - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - support.junit_xml_list = None - - -def runtest(ns: Namespace, test_name: str) -> TestResult: - """Run a single test. - - ns -- regrtest namespace of options - test_name -- the name of the test - - Returns a TestResult. - - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ - start_time = time.perf_counter() - result = TestResult(test_name) - try: - _runtest(result, ns) - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - result.duration = time.perf_counter() - start_time - return result - - -def run_unittest(test_mod): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(test_mod) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - return support.run_unittest(tests) - - -def save_env(ns: Namespace, test_name: str): - return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) - - -def regrtest_runner(result, test_func, ns) -> None: - # Run test_func(), collect statistics, and detect reference and memory - # leaks. - if ns.huntrleaks: - from test.libregrtest.refleak import dash_R - refleak, test_result = dash_R(ns, result.test_name, test_func) - else: - test_result = test_func() - refleak = False - - if refleak: - result.state = State.REFLEAK - - match test_result: - case TestStats(): - stats = test_result - case unittest.TestResult(): - stats = TestStats.from_unittest(test_result) - case doctest.TestResults(): - stats = TestStats.from_doctest(test_result) - case None: - print_warning(f"{result.test_name} test runner returned None: {test_func}") - stats = None - case _: - print_warning(f"Unknown test result type: {type(test_result)}") - stats = None - - result.stats = stats - - -# Storage of uncollectable objects -FOUND_GARBAGE = [] - - -def _load_run_test(result: TestResult, ns: Namespace) -> None: - # Load the test function, run the test function. - module_name = abs_module_name(result.test_name, ns.testdir) - - # Remove the module from sys.module to reload it if it was already imported - sys.modules.pop(module_name, None) - - test_mod = importlib.import_module(module_name) - - if hasattr(test_mod, "test_main"): - # https://github.com/python/cpython/issues/89392 - raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest") - def test_func(): - return run_unittest(test_mod) - - try: - with save_env(ns, result.test_name): - regrtest_runner(result, test_func, ns) - finally: - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce - # failures. - support.gc_collect() - - remove_testfn(result.test_name, ns.verbose) - - if gc.garbage: - support.environment_altered = True - print_warning(f"{result.test_name} created {len(gc.garbage)} " - f"uncollectable object(s)") - - # move the uncollectable objects somewhere, - # so we don't see them again - FOUND_GARBAGE.extend(gc.garbage) - gc.garbage.clear() - - support.reap_children() - - -def _runtest_env_changed_exc(result: TestResult, ns: Namespace, - display_failure: bool = True) -> None: - # Detect environment changes, handle exceptions. - - # Reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False - - if ns.pgo: - display_failure = False - - test_name = result.test_name - try: - clear_caches() - support.gc_collect() - - with save_env(ns, test_name): - _load_run_test(result, ns) - except support.ResourceDenied as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.RESOURCE_DENIED - return - except unittest.SkipTest as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.SKIPPED - return - except support.TestFailedWithDetails as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.errors = exc.errors - result.failures = exc.failures - result.stats = exc.stats - return - except support.TestFailed as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.stats = exc.stats - return - except support.TestDidNotRun: - result.state = State.DID_NOT_RUN - return - except KeyboardInterrupt: - print() - result.state = State.INTERRUPTED - return - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - return - - if support.environment_altered: - result.set_env_changed() - # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) - if result.state is None: - result.state = State.PASSED - - -def remove_testfn(test_name: str, verbose: int) -> None: - # Try to clean up os_helper.TESTFN if left behind. - # - # While tests shouldn't leave any files or directories behind, when a test - # fails that can be tedious for it to arrange. The consequences can be - # especially nasty on Windows, since if a test leaves a file open, it - # cannot be deleted by name (while there's nothing we can do about that - # here either, we can display the name of the offending test, which is a - # real help). - name = os_helper.TESTFN - if not os.path.exists(name): - return - - if os.path.isdir(name): - import shutil - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise RuntimeError(f"os.path says {name!r} exists but is neither " - f"directory nor file") - - if verbose: - print_warning(f"{test_name} left behind {kind} {name!r}") - support.environment_altered = True - - try: - import stat - # fix possible permissions problems that might prevent cleanup - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as exc: - print_warning(f"{test_name} left behind {kind} {name!r} " - f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py new file mode 100644 index 00000000000000..ec6cabf7284001 --- /dev/null +++ b/Lib/test/libregrtest/runtests.py @@ -0,0 +1,81 @@ +import dataclasses +import json +from typing import Any + +from . import FilterTuple + + +TestsTuple = tuple[str, ...] +FilterDict = dict[str, FilterTuple] + + +@dataclasses.dataclass(slots=True, frozen=True) +class RunTests: + tests: TestsTuple + # --match option + match_tests: FilterTuple | None = None + # --ignore option + ignore_tests: FilterTuple | None = None + # used by --rerun + match_tests_dict: FilterDict | None = None + rerun: bool = False + forever: bool = False + use_junit: bool = False + fail_env_changed: bool = False + fail_fast: bool = False + pgo: bool = False + pgo_extended: bool = False + verbose: int = False + quiet: bool = False + timeout: float | None = None + # --verbose3 option + output_on_failure: bool = False + test_dir: str | None = None + huntrleaks: tuple[int, int, str] | None = None + memlimit: str | None = None + gc_threshold: int | None = None + use_resources: list[str] = dataclasses.field(default_factory=list) + python_executable: list[str] | None = None + + def get_match_tests(self, test_name: str) -> FilterTuple | None: + if self.match_tests_dict is not None: + return self.match_tests_dict.get(test_name, None) + else: + return None + + def iter_tests(self): + if self.forever: + while True: + yield from self.tests + else: + yield from self.tests + + def copy(self, **override): + state = dataclasses.asdict(self) + state.update(override) + return RunTests(**state) + + def as_json(self): + return json.dumps(self, cls=_EncodeRunTests) + + def from_json(worker_json): + return json.loads(worker_json, object_hook=_decode_runtests) + + + +class _EncodeRunTests(json.JSONEncoder): + def default(self, o: Any) -> dict[str, Any]: + if isinstance(o, RunTests): + result = dataclasses.asdict(o) + result["__runtests__"] = True + return result + else: + return super().default(o) + + +def _decode_runtests(d: dict[str, Any]) -> RunTests | dict[str, Any]: + if "__runtests__" in d: + d.pop('__runtests__') + return RunTests(**d) + else: + return d diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py index 164fe9806b5f0d..be94acf3246dcc 100644 --- a/Lib/test/libregrtest/save_env.py +++ b/Lib/test/libregrtest/save_env.py @@ -5,7 +5,8 @@ import threading from test import support from test.support import os_helper -from test.libregrtest.utils import print_warning + +from .utils import print_warning class SkipTestEnvironment(Exception): diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py index b76bece7ca08b5..fabc314cd85244 100644 --- a/Lib/test/libregrtest/setup.py +++ b/Lib/test/libregrtest/setup.py @@ -11,14 +11,69 @@ except ImportError: gc = None -from test.libregrtest.utils import (setup_unraisable_hook, - setup_threading_excepthook) +from . import RunTests +from .utils import setup_unraisable_hook, setup_threading_excepthook -UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD" +UNICODE_GUARD_ENV = "PYTHON_REGRTEST_UNICODE_GUARD" -def setup_tests(ns): +def setup_test_dir(testdir): + if testdir: + # Prepend test directory to sys.path, so runtest() will be able + # to locate tests + sys.path.insert(0, os.path.abspath(testdir)) + + +def replace_stdout(): + """Set stdout encoder error handler to backslashreplace (as stderr error + handler) to avoid UnicodeEncodeError when printing a traceback""" + stdout = sys.stdout + try: + fd = stdout.fileno() + except ValueError: + # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper + # object. Leaving sys.stdout unchanged. + # + # Catch ValueError to catch io.UnsupportedOperation on TextIOBase + # and ValueError on a closed stream. + return + + sys.stdout = open(fd, 'w', + encoding=stdout.encoding, + errors="backslashreplace", + closefd=False, + newline='\n') + + def restore_stdout(): + sys.stdout.close() + sys.stdout = stdout + atexit.register(restore_stdout) + + +def _adjust_resource_limits(): + """Adjust the system resource limits (ulimit) if needed.""" + try: + import resource + from resource import RLIMIT_NOFILE + except ImportError: + return + fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE) + # On macOS the default fd limit is sometimes too low (256) for our + # test suite to succeed. Raise it to something more reasonable. + # 1024 is a common Linux default. + desired_fds = 1024 + if fd_limit < desired_fds and fd_limit < max_fds: + new_fd_limit = min(desired_fds, max_fds) + try: + resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds)) + print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") + except (ValueError, OSError) as err: + print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " + f"{new_fd_limit}: {err}.") + + +def _setup_tests(): try: stderr_fd = sys.__stderr__.fileno() except (ValueError, AttributeError): @@ -44,11 +99,6 @@ def setup_tests(ns): replace_stdout() support.record_original_stdout(sys.stdout) - if ns.testdir: - # Prepend test directory to sys.path, so runtest() will be able - # to locate tests - sys.path.insert(0, os.path.abspath(ns.testdir)) - # Some times __path__ and __file__ are not absolute (e.g. while running from # Lib/) and, if we change the CWD to run the tests in a temporary dir, some # imports might fail. This affects only the modules imported before os.chdir(). @@ -66,19 +116,6 @@ def setup_tests(ns): if getattr(module, '__file__', None): module.__file__ = os.path.abspath(module.__file__) - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False - - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - - if ns.threshold is not None: - gc.set_threshold(ns.threshold) - - support.suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2) - - support.use_resources = ns.use_resources - if hasattr(sys, 'addaudithook'): # Add an auditing hook for all tests to ensure PySys_Audit is tested def _test_audit_hook(name, args): @@ -88,21 +125,6 @@ def _test_audit_hook(name, args): setup_unraisable_hook() setup_threading_excepthook() - if ns.timeout is not None: - # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT - support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, ns.timeout / 40) - support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, ns.timeout / 4) - - # If --timeout is short: reduce timeouts - support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, ns.timeout) - support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, ns.timeout) - support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, ns.timeout) - support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, ns.timeout) - - if ns.xmlpath: - from test.support.testresult import RegressionTestResult - RegressionTestResult.USE_XML = True - # Ensure there's a non-ASCII character in env vars at all times to force # tests consider this case. See BPO-44647 for details. if TESTFN_UNDECODABLE and os.supports_bytes_environ: @@ -111,49 +133,42 @@ def _test_audit_hook(name, args): os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - stdout = sys.stdout - try: - fd = stdout.fileno() - except ValueError: - # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper - # object. Leaving sys.stdout unchanged. - # - # Catch ValueError to catch io.UnsupportedOperation on TextIOBase - # and ValueError on a closed stream. - return +def setup_support(runtests: RunTests) -> None: + support.PGO = runtests.pgo + support.PGO_EXTENDED = runtests.pgo_extended + support.set_match_tests(runtests.match_tests, runtests.ignore_tests) + support.failfast = runtests.fail_fast + support.verbose = runtests.verbose - sys.stdout = open(fd, 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) +def setup_tests(runtests: RunTests): + _setup_tests() + + if runtests.huntrleaks: + unittest.BaseTestSuite._cleanup = False + if runtests.memlimit is not None: + support.set_memlimit(runtests.memlimit) -def _adjust_resource_limits(): - """Adjust the system resource limits (ulimit) if needed.""" - try: - import resource - from resource import RLIMIT_NOFILE - except ImportError: - return - fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE) - # On macOS the default fd limit is sometimes too low (256) for our - # test suite to succeed. Raise it to something more reasonable. - # 1024 is a common Linux default. - desired_fds = 1024 - if fd_limit < desired_fds and fd_limit < max_fds: - new_fd_limit = min(desired_fds, max_fds) - try: - resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds)) - print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") - except (ValueError, OSError) as err: - print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " - f"{new_fd_limit}: {err}.") + if runtests.gc_threshold is not None: + gc.set_threshold(runtests.gc_threshold) + + support.suppress_msvcrt_asserts(runtests.verbose and runtests.verbose >= 2) + + support.use_resources = runtests.use_resources + + timeout = runtests.timeout + if timeout is not None: + # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT + support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, timeout / 40) + support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, timeout / 4) + + # If --timeout is short: reduce timeouts + support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, timeout) + support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, timeout) + support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout) + support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout) + + if runtests.use_junit: + from test.support.testresult import RegressionTestResult + RegressionTestResult.USE_XML = True diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py new file mode 100644 index 00000000000000..c51e533ca479eb --- /dev/null +++ b/Lib/test/libregrtest/single.py @@ -0,0 +1,251 @@ +import doctest +import faulthandler +import gc +import importlib +import sys +import time +import traceback +import types +import unittest + +from test import support +from test.support import threading_helper + +from . import TestStats, State, TestResult, RunTests +from .setup import setup_support +from .save_env import saved_test_environment +from .utils import ( + clear_caches, print_warning, + abs_module_name, remove_testfn, capture_output) + + +# Minimum duration of a test to display its duration or to mention that +# the test is running in background +PROGRESS_MIN_TIME = 30.0 # seconds + + +def _run_unittest(test_mod): + loader = unittest.TestLoader() + tests = loader.loadTestsFromModule(test_mod) + for error in loader.errors: + print(error, file=sys.stderr) + if loader.errors: + raise Exception("errors while loading tests") + return support.run_unittest(tests) + + +def _save_env(test_name: str, runtests: RunTests): + return saved_test_environment(test_name, runtests.verbose, runtests.quiet, + pgo=runtests.pgo) + + +def _regrtest_runner(result: TestResult, test_mod: types.ModuleType, + runtests: RunTests) -> None: + # Run test_func(), collect statistics, and detect reference and memory + # leaks. + + if runtests.huntrleaks: + from .refleak import runtest_refleak + def test_func(): + return _run_unittest(test_mod) + leak, test_result = runtest_refleak(result.test_name, test_func, + runtests.huntrleaks, runtests.quiet) + else: + test_result = _run_unittest(test_mod) + leak = False + + if leak: + result.state = State.REFLEAK + + match test_result: + case TestStats(): + stats = test_result + case unittest.TestResult(): + stats = TestStats.from_unittest(test_result) + case doctest.TestResults(): + stats = TestStats.from_doctest(test_result) + case None: + print_warning(f"{result.test_name} test runner returned None: {test_func}") + stats = None + case _: + print_warning(f"Unknown test result type: {type(test_result)}") + stats = None + + result.stats = stats + + +# Storage of uncollectable objects +FOUND_GARBAGE = [] + + +def _load_run_test(result: TestResult, runtests: RunTests) -> None: + # Load the test function, run the test function. + module_name = abs_module_name(result.test_name, runtests.test_dir) + + # Remove the module from sys.module to reload it if it was already imported + sys.modules.pop(module_name, None) + + test_mod = importlib.import_module(module_name) + + if hasattr(test_mod, "test_main"): + # https://github.com/python/cpython/issues/89392 + raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest") + + try: + with _save_env(result.test_name, runtests): + _regrtest_runner(result, test_mod, runtests) + finally: + # First kill any dangling references to open files etc. + # This can also issue some ResourceWarnings which would otherwise get + # triggered during the following test run, and possibly produce + # failures. + support.gc_collect() + + remove_testfn(result.test_name, runtests.verbose) + + if gc.garbage: + support.environment_altered = True + print_warning(f"{result.test_name} created {len(gc.garbage)} " + f"uncollectable object(s)") + + # move the uncollectable objects somewhere, + # so we don't see them again + FOUND_GARBAGE.extend(gc.garbage) + gc.garbage.clear() + + support.reap_children() + + +def _runtest_env_changed_exc(result: TestResult, runtests: RunTests, + display_failure: bool = True) -> None: + # Detect environment changes, handle exceptions. + pgo = runtests.pgo + + # Reset the environment_altered flag to detect if a test altered + # the environment + support.environment_altered = False + + if pgo: + display_failure = False + + test_name = result.test_name + try: + clear_caches() + support.gc_collect() + + with _save_env(test_name, runtests): + _load_run_test(result, runtests) + except support.ResourceDenied as msg: + if not runtests.quiet and not pgo: + print(f"{test_name} skipped -- {msg}", flush=True) + result.state = State.RESOURCE_DENIED + return + except unittest.SkipTest as msg: + if not runtests.quiet and not pgo: + print(f"{test_name} skipped -- {msg}", flush=True) + result.state = State.SKIPPED + return + except support.TestFailedWithDetails as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + result.state = State.FAILED + result.errors = exc.errors + result.failures = exc.failures + result.stats = exc.stats + return + except support.TestFailed as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + result.state = State.FAILED + result.stats = exc.stats + return + except support.TestDidNotRun: + result.state = State.DID_NOT_RUN + return + except KeyboardInterrupt: + print() + result.state = State.INTERRUPTED + return + except: + if not pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + result.state = State.UNCAUGHT_EXC + return + + if support.environment_altered: + result.set_env_changed() + # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) + if result.state is None: + result.state = State.PASSED + + +def _run_single_test(result: TestResult, runtests: RunTests) -> None: + # Capture stdout and stderr, set faulthandler timeout, + # and create JUnit XML report. + timeout = runtests.timeout + use_timeout = (bool(timeout) and threading_helper.can_start_thread) + if use_timeout: + faulthandler.dump_traceback_later(timeout, exit=True) + + try: + setup_support(runtests) + if runtests.use_junit: + support.junit_xml_list = [] + else: + support.junit_xml_list = None + + if runtests.output_on_failure: + support.verbose = True + + with capture_output() as stream: + _runtest_env_changed_exc(result, runtests, + display_failure=False) + + # Ignore output if the test passed successfully + if result.state != State.PASSED: + output = stream.getvalue() + sys.stderr.write(output) + sys.stderr.flush() + else: + # Tell tests to be moderately quiet + verbose = runtests.verbose + support.verbose = verbose + _runtest_env_changed_exc(result, runtests, + display_failure=not verbose) + + if runtests.use_junit: + xml_list = support.junit_xml_list + if xml_list: + import xml.etree.ElementTree as ET + result.xml_data = [ET.tostring(x).decode('us-ascii') + for x in xml_list] + finally: + if use_timeout: + faulthandler.cancel_dump_traceback_later() + support.junit_xml_list = None + + +def run_single_test(test_name: str, runtests: RunTests) -> TestResult: + """Run a single test. + + If runtests.xmlpath is not None, xml_data is a list containing each + generated testsuite element. + """ + start_time = time.perf_counter() + result = TestResult(test_name) + try: + _run_single_test(result, runtests) + except: + if not runtests.pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + result.state = State.UNCAUGHT_EXC + result.duration = time.perf_counter() - start_time + return result diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 9a60a3d40b4c2c..e941ba702fb496 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,9 +1,19 @@ +import contextlib +import io +import locale import math import os.path +import platform +import random import sys import sysconfig +import tempfile import textwrap from test import support +from test.support import os_helper + + +TEMP_DIR_PREFIX = 'test_python_' def format_duration(seconds): @@ -305,3 +315,206 @@ def get_build_info(): build.append("dtrace") return build + + +def abs_module_name(test_name: str, test_dir: str | None) -> str: + if test_name.startswith('test.') or test_dir: + return test_name + else: + # Import it from the test package + return 'test.' + test_name + + +# gh-90681: When rerunning tests, we might need to rerun the whole +# class or module suite if some its life-cycle hooks fail. +# Test level hooks are not affected. +_TEST_LIFECYCLE_HOOKS = frozenset(( + 'setUpClass', 'tearDownClass', + 'setUpModule', 'tearDownModule', +)) + +def normalize_test_name(test_full_name, *, is_error=False): + short_name = test_full_name.split(" ")[0] + if is_error and short_name in _TEST_LIFECYCLE_HOOKS: + if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): + # if setUpModule() or tearDownModule() failed, don't filter + # tests with the test file name, don't use use filters. + return None + + # This means that we have a failure in a life-cycle hook, + # we need to rerun the whole module or class suite. + # Basically the error looks like this: + # ERROR: setUpClass (test.test_reg_ex.RegTest) + # or + # ERROR: setUpModule (test.test_reg_ex) + # So, we need to parse the class / module name. + lpar = test_full_name.index('(') + rpar = test_full_name.index(')') + return test_full_name[lpar + 1: rpar].split('.')[-1] + return short_name + + +def remove_testfn(test_name: str, verbose: int) -> None: + # Try to clean up os_helper.TESTFN if left behind. + # + # While tests shouldn't leave any files or directories behind, when a test + # fails that can be tedious for it to arrange. The consequences can be + # especially nasty on Windows, since if a test leaves a file open, it + # cannot be deleted by name (while there's nothing we can do about that + # here either, we can display the name of the offending test, which is a + # real help). + name = os_helper.TESTFN + if not os.path.exists(name): + return + + if os.path.isdir(name): + import shutil + kind, nuker = "directory", shutil.rmtree + elif os.path.isfile(name): + kind, nuker = "file", os.unlink + else: + raise RuntimeError(f"os.path says {name!r} exists but is neither " + f"directory nor file") + + if verbose: + print_warning(f"{test_name} left behind {kind} {name!r}") + support.environment_altered = True + + try: + import stat + # fix possible permissions problems that might prevent cleanup + os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + nuker(name) + except Exception as exc: + print_warning(f"{test_name} left behind {kind} {name!r} " + f"and it couldn't be removed: {exc}") + + +def fix_umask(): + if support.is_emscripten: + # Emscripten has default umask 0o777, which breaks some tests. + # see https://github.com/emscripten-core/emscripten/issues/17269 + old_mask = os.umask(0) + if old_mask == 0o777: + os.umask(0o027) + else: + os.umask(old_mask) + + +def display_header(): + # Print basic platform information + print("==", platform.python_implementation(), *sys.version.split()) + print("==", platform.platform(aliased=True), + "%s-endian" % sys.byteorder) + print("== Python build:", ' '.join(get_build_info())) + print("== cwd:", os.getcwd()) + cpu_count = os.cpu_count() + if cpu_count: + print("== CPU count:", cpu_count) + print("== encodings: locale=%s, FS=%s" + % (locale.getencoding(), sys.getfilesystemencoding())) + + # This makes it easier to remember what to set in your local + # environment when trying to reproduce a sanitizer failure. + asan = support.check_sanitizer(address=True) + msan = support.check_sanitizer(memory=True) + ubsan = support.check_sanitizer(ub=True) + sanitizers = [] + if asan: + sanitizers.append("address") + if msan: + sanitizers.append("memory") + if ubsan: + sanitizers.append("undefined behavior") + if not sanitizers: + return + + print(f"== sanitizers: {', '.join(sanitizers)}") + for sanitizer, env_var in ( + (asan, "ASAN_OPTIONS"), + (msan, "MSAN_OPTIONS"), + (ubsan, "UBSAN_OPTIONS"), + ): + options= os.environ.get(env_var) + if sanitizer and options is not None: + print(f"== {env_var}={options!r}") + + +def select_temp_dir(tmp_dir) -> str: + if tmp_dir: + tmp_dir = os.path.expanduser(tmp_dir) + elif sysconfig.is_python_build(): + # When tests are run from the Python build directory, it is best + # practice to keep the test files in a subfolder. This eases the + # cleanup of leftover files using the "make distclean" command. + tmp_dir = sysconfig.get_config_var('abs_builddir') + if tmp_dir is None: + # bpo-30284: On Windows, only srcdir is available. Using + # abs_builddir mostly matters on UNIX when building Python + # out of the source tree, especially when the source tree + # is read only. + tmp_dir = sysconfig.get_config_var('srcdir') + tmp_dir = os.path.join(tmp_dir, 'build') + else: + tmp_dir = tempfile.gettempdir() + return os.path.abspath(tmp_dir) + + +def make_temp_dir(tmp_dir, is_worker): + os.makedirs(tmp_dir, exist_ok=True) + + # Define a writable temp dir that will be used as cwd while running + # the tests. The name of the dir includes the pid to allow parallel + # testing (see the -j option). + # Emscripten and WASI have stubbed getpid(), Emscripten has only + # milisecond clock resolution. Use randint() instead. + if sys.platform in {"emscripten", "wasi"}: + nounce = random.randint(0, 1_000_000) + else: + nounce = os.getpid() + + if is_worker: + test_cwd = 'worker_{}'.format(nounce) + else: + test_cwd = '{}'.format(nounce) + test_cwd = TEMP_DIR_PREFIX + test_cwd + test_cwd += os_helper.FS_NONASCII + test_cwd = os.path.join(tmp_dir, test_cwd) + return test_cwd + + +def cleanup_directory(tmp_dir): + import glob + + path = os.path.join(glob.escape(tmp_dir), TEMP_DIR_PREFIX + '*') + print("Cleanup %s directory" % tmp_dir) + for name in glob.glob(path): + if os.path.isdir(name): + print("Remove directory: %s" % name) + os_helper.rmtree(name) + else: + print("Remove file: %s" % name) + os_helper.unlink(name) + + +@contextlib.contextmanager +def capture_output(): + stream = io.StringIO() + + print_warning = support.print_warning + orig_stdout = sys.stdout + orig_stderr = sys.stderr + orig_print_warnings_stderr = print_warning.orig_stderr + try: + sys.stdout = stream + sys.stderr = stream + # print_warning() writes into the temporary stream to preserve + # messages order. If support.environment_altered becomes true, + # warnings will be written to sys.stderr below. + print_warning.orig_stderr = stream + + yield stream + finally: + sys.stdout = orig_stdout + sys.stderr = orig_stderr + print_warning.orig_stderr = orig_print_warnings_stderr diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py new file mode 100644 index 00000000000000..956c6f5598e862 --- /dev/null +++ b/Lib/test/libregrtest/worker.py @@ -0,0 +1,75 @@ +import os +import subprocess +import sys +from typing import TextIO + +from test import support +from test.support import os_helper + +from .setup import setup_tests, setup_test_dir +from .single import run_single_test, RunTests, TestResult + + +USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) + + +def create_worker_process(runtests: RunTests, + output_file: TextIO, + tmp_dir: str | None = None) -> subprocess.Popen: + worker_json = runtests.as_json() + + python = runtests.python_executable + if python is not None: + executable = python + else: + executable = [sys.executable] + cmd = [*executable, *support.args_from_interpreter_flags(), + '-u', # Unbuffered stdout and stderr + '-m', 'test.regrtest', + '--worker-json', worker_json] + + env = dict(os.environ) + if tmp_dir is not None: + env['TMPDIR'] = tmp_dir + env['TEMP'] = tmp_dir + env['TMP'] = tmp_dir + + # Running the child from the same working directory as regrtest's original + # invocation ensures that TEMPDIR for the child is the same when + # sysconfig.is_python_build() is true. See issue 15300. + kw = dict( + env=env, + stdout=output_file, + # bpo-45410: Write stderr into stdout to keep messages order + stderr=output_file, + text=True, + close_fds=(os.name != 'nt'), + cwd=os_helper.SAVEDCWD, + ) + if USE_PROCESS_GROUP: + kw['start_new_session'] = True + return subprocess.Popen(cmd, **kw) + + +def worker_process(worker_json: str) -> None: + runtests = RunTests.from_json(worker_json) + test_name = runtests.tests[0] + + setup_test_dir(runtests.test_dir) + setup_tests(runtests) + + match_tests = runtests.match_tests + if runtests.rerun: + if match_tests: + matching = "matching: " + ", ".join(match_tests) + print(f"Re-running {test_name} in verbose mode ({matching})", flush=True) + else: + print(f"Re-running {test_name} in verbose mode", flush=True) + + result: TestResult = run_single_test(test_name, runtests) + print() # Force a newline (just in case) + + # Serialize TestResult as JSON into stdout + result.as_json_into(sys.stdout) + sys.stdout.flush() + sys.exit(0) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 38ad965e155302..8fcc6724d196b9 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -887,27 +887,31 @@ def inner(*args, **kwds): MAX_Py_ssize_t = sys.maxsize -def set_memlimit(limit): - global max_memuse - global real_max_memuse +def _parse_memlimit(limit: str) -> int: sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - real_max_memuse = memlimit - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t + raise ValueError(f'Invalid memory limit: {limit!r}') + return int(float(m.group(1)) * sizes[m.group(2).lower()]) + +def set_memlimit(limit: str): + global max_memuse + global real_max_memuse + memlimit = _parse_memlimit(limit) if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) + raise ValueError('Memory limit {limit!r} too low to be useful') + + real_max_memuse = memlimit + memlimit = min(memlimit, MAX_Py_ssize_t) max_memuse = memlimit + class _MemoryWatchdog: """An object which periodically watches the process' memory consumption and prints it out. diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index aff5404408f8d0..45fad81d510ddd 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -22,7 +22,7 @@ from test import support from test.support import os_helper, TestStats from test.libregrtest import utils, setup -from test.libregrtest.runtest import normalize_test_name +from test.libregrtest.utils import normalize_test_name if not support.has_subprocess_support: raise unittest.SkipTest("test module requires subprocess") @@ -75,11 +75,6 @@ def test_wait(self): ns = libregrtest._parse_args(['--wait']) self.assertTrue(ns.wait) - def test_worker_args(self): - ns = libregrtest._parse_args(['--worker-args', '[[], {}]']) - self.assertEqual(ns.worker_args, '[[], {}]') - self.checkError(['--worker-args'], 'expected one argument') - def test_start(self): for opt in '-S', '--start': with self.subTest(opt=opt): @@ -288,7 +283,7 @@ def test_coverage(self): for opt in '-T', '--coverage': with self.subTest(opt=opt): ns = libregrtest._parse_args([opt]) - self.assertTrue(ns.trace) + self.assertTrue(ns.coverage) def test_coverdir(self): for opt in '-D', '--coverdir': @@ -587,9 +582,9 @@ def list_regex(line_format, tests): self.check_line(output, f'Result: {state}', full=True) def parse_random_seed(self, output): - match = self.regex_search(r'Using random seed ([0-9]+)', output) + match = self.regex_search(r'Using random seed: ([0-9]+)', output) randseed = int(match.group(1)) - self.assertTrue(0 <= randseed <= 10000000, randseed) + self.assertTrue(0 <= randseed <= 100_000_000, randseed) return randseed def run_command(self, args, input=None, exitcode=0, **kw): diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 64280739f00946..5b57c5fd54a68d 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -760,7 +760,45 @@ def recursive_function(depth): else: self.fail("RecursionError was not raised") - #self.assertEqual(available, 2) + def test_parse_memlimit(self): + parse = support._parse_memlimit + KiB = 1024 + MiB = KiB * 1024 + GiB = MiB * 1024 + TiB = GiB * 1024 + self.assertEqual(parse('0k'), 0) + self.assertEqual(parse('3k'), 3 * KiB) + self.assertEqual(parse('2.4m'), int(2.4 * MiB)) + self.assertEqual(parse('4g'), int(4 * GiB)) + self.assertEqual(parse('1t'), TiB) + + for limit in ('', '3', '3.5.10k', '10x'): + with self.subTest(limit=limit): + with self.assertRaises(ValueError): + parse(limit) + + def test_set_memlimit(self): + _4GiB = 4 * 1024 ** 3 + TiB = 1024 ** 4 + old_max_memuse = support.max_memuse + old_real_max_memuse = support.real_max_memuse + try: + if sys.maxsize > 2**32: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, _4GiB) + self.assertEqual(support.real_max_memuse, _4GiB) + + big = 2**100 // TiB + support.set_memlimit(f'{big}t') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, big * TiB) + else: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, _4GiB) + finally: + support.max_memuse = old_max_memuse + support.real_max_memuse = old_real_max_memuse # XXX -follows a list of untested API # make_legacy_pyc @@ -773,7 +811,6 @@ def recursive_function(depth): # EnvironmentVarGuard # transient_internet # run_with_locale - # set_memlimit # bigmemtest # precisionbigmemtest # bigaddrspacetest diff --git a/Misc/NEWS.d/next/Tests/2023-09-05-21-13-17.gh-issue-108834._CnOtt.rst b/Misc/NEWS.d/next/Tests/2023-09-05-21-13-17.gh-issue-108834._CnOtt.rst new file mode 100644 index 00000000000000..25dc6955363838 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2023-09-05-21-13-17.gh-issue-108834._CnOtt.rst @@ -0,0 +1,6 @@ +Refactor ``test.libregrtest`` to split the code into sub-files and add new +classes. Make function arguments simpler: avoid passing complex classes with +an unclear API ('ns' Namespace class), but instead try to pass simple types +like bool and str. Functions are becoming more "stateless": only depend on +their simple arguments. It should ease future maintenance of the code and +prepare the project to get type annotations. Patch by Victor Stinner.