diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 9b202bd4..f739d297 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 deploy: needs: package @@ -39,7 +39,9 @@ jobs: path: dist - name: Publish package to PyPI - uses: pypa/gh-action-pypi-publish@v1.9.0 + uses: pypa/gh-action-pypi-publish@v1.10.1 + with: + attestations: true - name: Push tag run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b81f37e3..ca33e3a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 test: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb240bdb..6fffdf2e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.6.1" + rev: "v0.6.5" hooks: - id: ruff args: ["--fix"] @@ -23,7 +23,7 @@ repos: language: python additional_dependencies: [pygments, restructuredtext_lint] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.1 + rev: v1.11.2 hooks: - id: mypy files: ^(src/|testing/) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 2eff34c1..4c3bcec2 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -5,16 +5,18 @@ from queue import Empty from queue import Queue import sys +import traceback from typing import Any +from typing import Callable from typing import Sequence import warnings -import traceback import execnet import pytest from xdist.remote import Producer from xdist.remote import WorkerInfo +from xdist.scheduler import CustomGroup from xdist.scheduler import EachScheduling from xdist.scheduler import LoadFileScheduling from xdist.scheduler import LoadGroupScheduling @@ -22,7 +24,6 @@ from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling from xdist.scheduler import WorkStealingScheduling -from xdist.scheduler import CustomGroup from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -60,14 +61,14 @@ def __init__(self, config: pytest.Config) -> None: self._failed_collection_errors: dict[object, bool] = {} self._active_nodes: set[WorkerController] = set() self._failed_nodes_count = 0 - self.saved_put = None + self.saved_put: Callable[[tuple[str, dict[str, Any]]], None] self.remake_nodes = False self.ready_to_run_tests = False self._max_worker_restart = get_default_max_worker_restart(self.config) # summary message to print at the end of the session self._summary_report: str | None = None self.terminal = config.pluginmanager.getplugin("terminalreporter") - self.worker_status: dict[WorkerController, str] = {} + self.worker_status: dict[str, str] = {} if self.terminal: self.trdist = TerminalDistReporter(config) config.pluginmanager.register(self.trdist, "terminaldistreporter") @@ -180,45 +181,46 @@ def loop_once(self) -> None: self.triggershutdown() - def is_node_finishing(self, node: WorkerController): + def is_node_finishing(self, node: WorkerController) -> bool: """Check if a test worker is considered to be finishing. Evaluate whether it's on its last test, or if no tests are pending. """ + assert self.sched is not None + assert type(self.sched) is CustomGroup pending = self.sched.node2pending.get(node) return pending is not None and len(pending) < 2 - def is_node_clear(self, node: WorkerController): - """Check if a test worker has no pending tests.""" - pending = self.sched.node2pending.get(node) - return pending is None or len(pending) == 0 - - - def are_all_nodes_finishing(self): + def are_all_nodes_finishing(self) -> bool: """Check if all workers are finishing (See 'is_node_finishing' above).""" + assert self.sched is not None return all(self.is_node_finishing(node) for node in self.sched.nodes) - def are_all_nodes_done(self): + def are_all_nodes_done(self) -> bool: """Check if all nodes have reported to finish.""" return all(s == "finished" for s in self.worker_status.values()) - def are_all_active_nodes_collected(self): + def are_all_active_nodes_collected(self) -> bool: """Check if all nodes have reported collection to be complete.""" if not all(n.gateway.id in self.worker_status for n in self._active_nodes): return False return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) - def reset_nodes_if_needed(self): + def reset_nodes_if_needed(self) -> None: + assert self.sched is not None + assert type(self.sched) is CustomGroup if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: self.reset_nodes() - def reset_nodes(self): + def reset_nodes(self) -> None: """Issue shutdown notices to workers for rescheduling purposes.""" + assert self.sched is not None + assert type(self.sched) is CustomGroup if len(self.sched.pending) != 0: self.remake_nodes = True for node in self.sched.nodes: @@ -226,22 +228,28 @@ def reset_nodes(self): node.shutdown() - def reschedule(self): + def reschedule(self) -> None: """Reschedule tests.""" + assert self.sched is not None + assert type(self.sched) is CustomGroup self.sched.do_resched = False self.sched.check_schedule(self.sched.nodes[0], 1.0, True) - def prepare_for_reschedule(self): + def prepare_for_reschedule(self) -> None: """Update test workers and their status tracking so rescheduling is ready.""" + assert type(self.sched) is CustomGroup + assert self.sched is not None self.remake_nodes = False num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] self.trdist._status = {} + assert self.nodemanager is not None new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) self.worker_status = {} self._active_nodes = set() self._active_nodes.update(new_nodes) self.sched.node2pending = {} + assert type(self.sched) is CustomGroup self.sched.do_resched = True # @@ -287,7 +295,9 @@ def worker_workerfinished(self, node: WorkerController) -> None: try: self.prepare_for_reschedule() except Exception as e: - self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}" + msg = ("Exception caught during preparation for rescheduling. Giving up." + f"\n{''.join(traceback.format_exception(e))}") + self.shouldstop = msg return self.config.hook.pytest_testnodedown(node=node, error=None) if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt @@ -308,10 +318,11 @@ def worker_workerfinished(self, node: WorkerController) -> None: assert not crashitem, (crashitem, node) self._active_nodes.remove(node) - def update_worker_status(self, node, status): + def update_worker_status(self, node: WorkerController, status: str) -> None: """Track the worker status. - Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker. + Can be used at callbacks like 'worker_workerfinished' so we remember wchic event + was reported last by each worker. """ self.worker_status[node.workerinfo["id"]] = status diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index 34b791d7..6395e2a5 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -1,6 +1,6 @@ +from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.each import EachScheduling as EachScheduling from xdist.scheduler.load import LoadScheduling as LoadScheduling -from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py index fec7001b..5d1439a2 100644 --- a/src/xdist/scheduler/customgroup.py +++ b/src/xdist/scheduler/customgroup.py @@ -1,7 +1,8 @@ from __future__ import annotations from itertools import cycle -from typing import Sequence, Any +from typing import Any +from typing import Sequence import pytest @@ -10,6 +11,7 @@ from xdist.workermanage import parse_spec_config from xdist.workermanage import WorkerController + class CustomGroup: """Implement grouped load scheduling across a variable number of nodes. @@ -203,7 +205,7 @@ def remove_pending_tests_from_node( ) -> None: raise NotImplementedError() - def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None: + def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None: """Maybe schedule new items on the node. If there are any globally pending nodes left then this will @@ -226,7 +228,7 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} + schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} for _ in range(len(dist_group['test_indices'])): n = next(nodes) #needs cleaner way to be identified @@ -235,13 +237,16 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" + f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}") self.report_line(message) else: - pending = self.node2pending.get(node) + pending = self.node2pending.get(node, []) if len(pending) < 2: - self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending") + self.report_line( + f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending" + ) node.shutdown() self.log("num items waiting for node:", len(self.pending)) @@ -301,7 +306,7 @@ def schedule(self) -> None: if not self.collection: return - dist_groups = {} + dist_groups: dict[str, dict[Any, Any]] = {} if self.is_first_time: for i, test in enumerate(self.collection): @@ -338,7 +343,7 @@ def schedule(self) -> None: dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} + schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} for _ in range(len(dist_group['test_indices'])): n = next(nodes) # needs cleaner way to be identified @@ -346,7 +351,8 @@ def schedule(self) -> None: schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + message = ("\n[-] [csg] schedule: processed scheduling for " + f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}") self.report_line(message) def _send_tests(self, node: WorkerController, num: int) -> None: @@ -356,7 +362,7 @@ def _send_tests(self, node: WorkerController, num: int) -> None: self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None: + def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None: tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] if tests_per_node: del self.dist_groups[dist_group_key]['pending_indices'][:num] @@ -396,4 +402,4 @@ def _check_nodes_have_same_collection(self) -> bool: def report_line(self, line: str) -> None: if self.terminal and self.config.option.verbose >= 0: - self.terminal.write_line(line) \ No newline at end of file + self.terminal.write_line(line) diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 2b410108..963dbb8a 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -11,7 +11,6 @@ from typing import Literal from typing import Sequence from typing import Union -from typing import Optional import uuid import warnings @@ -83,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], - max_nodes: Optional[int] = None + max_nodes: int | None = None ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py index ff8f9996..4b370b6c 100644 --- a/xdist-testing-ntop/test.py +++ b/xdist-testing-ntop/test.py @@ -1,6 +1,7 @@ -import pytest import time +import pytest + @pytest.mark.xdist_custom(name="low_4") def test_1():