diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 4c3bcec2..3ead8b55 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -180,7 +180,6 @@ def loop_once(self) -> None: if self.sched.tests_finished: self.triggershutdown() - def is_node_finishing(self, node: WorkerController) -> bool: """Check if a test worker is considered to be finishing. @@ -191,32 +190,33 @@ def is_node_finishing(self, node: WorkerController) -> bool: pending = self.sched.node2pending.get(node) return pending is not None and len(pending) < 2 - def are_all_nodes_finishing(self) -> bool: """Check if all workers are finishing (See 'is_node_finishing' above).""" assert self.sched is not None return all(self.is_node_finishing(node) for node in self.sched.nodes) - def are_all_nodes_done(self) -> bool: """Check if all nodes have reported to finish.""" return all(s == "finished" for s in self.worker_status.values()) - def are_all_active_nodes_collected(self) -> bool: """Check if all nodes have reported collection to be complete.""" if not all(n.gateway.id in self.worker_status for n in self._active_nodes): return False - return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) - + return all( + self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes + ) def reset_nodes_if_needed(self) -> None: assert self.sched is not None assert type(self.sched) is CustomGroup - if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: + if ( + self.are_all_nodes_finishing() + and self.ready_to_run_tests + and not self.sched.do_resched + ): self.reset_nodes() - def reset_nodes(self) -> None: """Issue shutdown notices to workers for rescheduling purposes.""" assert self.sched is not None @@ -227,7 +227,6 @@ def reset_nodes(self) -> None: if self.is_node_finishing(node): node.shutdown() - def reschedule(self) -> None: """Reschedule tests.""" assert self.sched is not None @@ -235,13 +234,14 @@ def reschedule(self) -> None: self.sched.do_resched = False self.sched.check_schedule(self.sched.nodes[0], 1.0, True) - def prepare_for_reschedule(self) -> None: """Update test workers and their status tracking so rescheduling is ready.""" assert type(self.sched) is CustomGroup assert self.sched is not None self.remake_nodes = False - num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] + num_workers = self.sched.dist_groups[self.sched.pending_groups[0]][ + "group_workers" + ] self.trdist._status = {} assert self.nodemanager is not None new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) @@ -295,8 +295,10 @@ def worker_workerfinished(self, node: WorkerController) -> None: try: self.prepare_for_reschedule() except Exception as e: - msg = ("Exception caught during preparation for rescheduling. Giving up." - f"\n{''.join(traceback.format_exception(e))}") + msg = ( + "Exception caught during preparation for rescheduling. Giving up." + f"\n{''.join(traceback.format_exception(e))}" + ) self.shouldstop = msg return self.config.hook.pytest_testnodedown(node=node, error=None) @@ -392,7 +394,9 @@ def worker_collectionfinish( scheduling the first time it logs which scheduler is in use. """ if self.shuttingdown: - self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}") + self.report_line( + f"[-] [dse] collectionfinish while closing {node.gateway.id}" + ) return self.update_worker_status(node, "collected") @@ -412,7 +416,9 @@ def worker_collectionfinish( self.trdist.ensure_show_status() self.terminal.write_line("") if self.config.option.verbose > 0: - self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}") + self.report_line( + f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}" + ) if isinstance(self.sched, CustomGroup): if self.ready_to_run_tests and self.are_all_active_nodes_collected(): # we're coming back here after finishing a batch of tests - so start the next batch diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 0ec9047d..e032f1b7 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -209,7 +209,9 @@ def pytest_collection_modifyitems( ) -> None: # add the group name to nodeid as suffix if --dist=loadgroup if config.getvalue("loadgroup") or config.getvalue("customgroup"): - functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" + functional_mark = ( + "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" + ) for item in items: mark = item.get_closest_marker(functional_mark) if not mark: diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py index 5d1439a2..85824225 100644 --- a/src/xdist/scheduler/customgroup.py +++ b/src/xdist/scheduler/customgroup.py @@ -189,7 +189,6 @@ def mark_test_complete( self.check_schedule(node, duration=duration) def mark_test_pending(self, item: str) -> None: - assert self.collection is not None self.pending.insert( 0, @@ -205,7 +204,9 @@ def remove_pending_tests_from_node( ) -> None: raise NotImplementedError() - def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None: + def check_schedule( + self, node: WorkerController, duration: float = 0, from_dsession: bool = False + ) -> None: """Maybe schedule new items on the node. If there are any globally pending nodes left then this will @@ -214,7 +215,9 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess heuristic to influence how many tests the node is assigned. """ if node.shutting_down: - self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down") + self.report_line( + f"[-] [csg] {node.workerinput['workerid']} is already shutting down" + ) return if self.pending: @@ -227,18 +230,25 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess if self.pending_groups: dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] - nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} - for _ in range(len(dist_group['test_indices'])): + nodes = cycle(self.nodes[0 : dist_group["group_workers"]]) + schedule_log: dict[str, Any] = { + n.gateway.id: [] + for n in self.nodes[0 : dist_group["group_workers"]] + } + for _ in range(len(dist_group["test_indices"])): n = next(nodes) - #needs cleaner way to be identified - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + # needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key][ + "pending_indices" + ][:1] schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" - f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}") + message = ( + f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:" + f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + ) self.report_line(message) else: @@ -310,26 +320,28 @@ def schedule(self) -> None: if self.is_first_time: for i, test in enumerate(self.collection): - if '@' in test: - group_mark = test.split('@')[-1] - group_workers = int(group_mark.split('_')[-1]) + if "@" in test: + group_mark = test.split("@")[-1] + group_workers = int(group_mark.split("_")[-1]) if group_workers > len(self.nodes): # We can only distribute across as many nodes as we have available # If a group requests more, we fallback to our actual max group_workers = len(self.nodes) else: - group_mark = 'default' + group_mark = "default" group_workers = len(self.nodes) - existing_tests = dist_groups.get(group_mark, {}).get('tests', []) + existing_tests = dist_groups.get(group_mark, {}).get("tests", []) existing_tests.append(test) - existing_indices = dist_groups.get(group_mark, {}).get('test_indices', []) + existing_indices = dist_groups.get(group_mark, {}).get( + "test_indices", [] + ) existing_indices.append(i) dist_groups[group_mark] = { - 'tests': existing_tests, - 'group_workers': group_workers, - 'test_indices': existing_indices, - 'pending_indices': existing_indices + "tests": existing_tests, + "group_workers": group_workers, + "test_indices": existing_indices, + "pending_indices": existing_indices, } self.dist_groups = dist_groups self.pending_groups = list(dist_groups.keys()) @@ -342,17 +354,21 @@ def schedule(self) -> None: return dist_group_key = self.pending_groups.pop(0) dist_group = self.dist_groups[dist_group_key] - nodes = cycle(self.nodes[0:dist_group['group_workers']]) - schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} - for _ in range(len(dist_group['test_indices'])): + nodes = cycle(self.nodes[0 : dist_group["group_workers"]]) + schedule_log: dict[str, Any] = { + n.gateway.id: [] for n in self.nodes[0 : dist_group["group_workers"]] + } + for _ in range(len(dist_group["test_indices"])): n = next(nodes) # needs cleaner way to be identified - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:1] schedule_log[n.gateway.id].extend(tests_per_node) self._send_tests_group(n, 1, dist_group_key) del self.dist_groups[dist_group_key] - message = ("\n[-] [csg] schedule: processed scheduling for " - f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}") + message = ( + "\n[-] [csg] schedule: processed scheduling for " + f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + ) self.report_line(message) def _send_tests(self, node: WorkerController, num: int) -> None: @@ -362,16 +378,17 @@ def _send_tests(self, node: WorkerController, num: int) -> None: self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None: - tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] + def _send_tests_group( + self, node: WorkerController, num: int, dist_group_key: str + ) -> None: + tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:num] if tests_per_node: - del self.dist_groups[dist_group_key]['pending_indices'][:num] + del self.dist_groups[dist_group_key]["pending_indices"][:num] for test_index in tests_per_node: self.pending.remove(test_index) self.node2pending[node].extend(tests_per_node) node.send_runtest_some(tests_per_node) - def _check_nodes_have_same_collection(self) -> bool: """Return True if all nodes have collected the same items. diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 963dbb8a..c130bb5c 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -82,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], - max_nodes: int | None = None + max_nodes: int | None = None, ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py index 4b370b6c..e43e881b 100644 --- a/xdist-testing-ntop/test.py +++ b/xdist-testing-ntop/test.py @@ -8,21 +8,25 @@ def test_1(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_2(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_3(): time.sleep(2) assert True + @pytest.mark.xdist_custom(name="low_4") def test_4(): time.sleep(2) assert True + # @pytest.mark.xdist_custom(name="low_4") # def test_4a(): # time.sleep(2) @@ -48,48 +52,58 @@ def test_4(): # time.sleep(2) # assert True + @pytest.mark.xdist_custom(name="med_2") def test_5(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_6(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_7(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="med_2") def test_8(): time.sleep(3) assert True + @pytest.mark.xdist_custom(name="high_1") def test_9(): time.sleep(5) assert True + @pytest.mark.xdist_custom(name="high_1") def test_10(): time.sleep(5) assert True + def test_11(): time.sleep(1) assert True + def test_12(): time.sleep(1) assert True + def test_13(): time.sleep(1) assert True + def test_14(): time.sleep(1) assert True