Skip to content

Commit

Permalink
Dev/ci/tur 21619 lint merge 2 (#7)
Browse files Browse the repository at this point in the history
* [pre-commit.ci] pre-commit autoupdate (pytest-dev#1120)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.1 → v0.6.2](astral-sh/ruff-pre-commit@v0.6.1...v0.6.2)
- [github.com/pre-commit/mirrors-mypy: v1.11.1 → v1.11.2](pre-commit/mirrors-mypy@v1.11.1...v1.11.2)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* build(deps): bump pypa/gh-action-pypi-publish (pytest-dev#1123)

Bumps the github-actions group with 1 update: [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish).


Updates `pypa/gh-action-pypi-publish` from 1.9.0 to 1.10.0
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.9.0...v1.10.0)

---
updated-dependencies:
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (pytest-dev#1124)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.2 → v0.6.3](astral-sh/ruff-pre-commit@v0.6.2...v0.6.3)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* Publish package with attestations (pytest-dev#1125)

Follow up to pytest-dev#1123.

* build(deps): bump the github-actions group with 2 updates (pytest-dev#1127)

Bumps the github-actions group with 2 updates: [hynek/build-and-inspect-python-package](https://github.com/hynek/build-and-inspect-python-package) and [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish).


Updates `hynek/build-and-inspect-python-package` from 2.8 to 2.9
- [Release notes](https://github.com/hynek/build-and-inspect-python-package/releases)
- [Changelog](https://github.com/hynek/build-and-inspect-python-package/blob/main/CHANGELOG.md)
- [Commits](hynek/build-and-inspect-python-package@v2.8...v2.9)

Updates `pypa/gh-action-pypi-publish` from 1.10.0 to 1.10.1
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.10.0...v1.10.1)

---
updated-dependencies:
- dependency-name: hynek/build-and-inspect-python-package
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (pytest-dev#1128)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.3 → v0.6.4](astral-sh/ruff-pre-commit@v0.6.3...v0.6.4)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (pytest-dev#1129)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.4 → v0.6.5](astral-sh/ruff-pre-commit@v0.6.4...v0.6.5)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* run ruff format

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Bruno Oliveira <nicoddemus@gmail.com>
  • Loading branch information
4 people authored Sep 20, 2024
1 parent c82840f commit d62d632
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 47 deletions.
36 changes: 21 additions & 15 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def loop_once(self) -> None:
if self.sched.tests_finished:
self.triggershutdown()


def is_node_finishing(self, node: WorkerController) -> bool:
"""Check if a test worker is considered to be finishing.
Expand All @@ -191,32 +190,33 @@ def is_node_finishing(self, node: WorkerController) -> bool:
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2


def are_all_nodes_finishing(self) -> bool:
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
assert self.sched is not None
return all(self.is_node_finishing(node) for node in self.sched.nodes)


def are_all_nodes_done(self) -> bool:
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())


def are_all_active_nodes_collected(self) -> bool:
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes)

return all(
self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes
)

def reset_nodes_if_needed(self) -> None:
assert self.sched is not None
assert type(self.sched) is CustomGroup
if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched:
if (
self.are_all_nodes_finishing()
and self.ready_to_run_tests
and not self.sched.do_resched
):
self.reset_nodes()


def reset_nodes(self) -> None:
"""Issue shutdown notices to workers for rescheduling purposes."""
assert self.sched is not None
Expand All @@ -227,21 +227,21 @@ def reset_nodes(self) -> None:
if self.is_node_finishing(node):
node.shutdown()


def reschedule(self) -> None:
"""Reschedule tests."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)


def prepare_for_reschedule(self) -> None:
"""Update test workers and their status tracking so rescheduling is ready."""
assert type(self.sched) is CustomGroup
assert self.sched is not None
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers']
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]][
"group_workers"
]
self.trdist._status = {}
assert self.nodemanager is not None
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
Expand Down Expand Up @@ -295,8 +295,10 @@ def worker_workerfinished(self, node: WorkerController) -> None:
try:
self.prepare_for_reschedule()
except Exception as e:
msg = ("Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}")
msg = (
"Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}"
)
self.shouldstop = msg
return
self.config.hook.pytest_testnodedown(node=node, error=None)
Expand Down Expand Up @@ -392,7 +394,9 @@ def worker_collectionfinish(
scheduling the first time it logs which scheduler is in use.
"""
if self.shuttingdown:
self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}")
self.report_line(
f"[-] [dse] collectionfinish while closing {node.gateway.id}"
)
return
self.update_worker_status(node, "collected")

Expand All @@ -412,7 +416,9 @@ def worker_collectionfinish(
self.trdist.ensure_show_status()
self.terminal.write_line("")
if self.config.option.verbose > 0:
self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}")
self.report_line(
f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}"
)
if isinstance(self.sched, CustomGroup):
if self.ready_to_run_tests and self.are_all_active_nodes_collected():
# we're coming back here after finishing a batch of tests - so start the next batch
Expand Down
4 changes: 3 additions & 1 deletion src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def pytest_collection_modifyitems(
) -> None:
# add the group name to nodeid as suffix if --dist=loadgroup
if config.getvalue("loadgroup") or config.getvalue("customgroup"):
functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
functional_mark = (
"xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
)
for item in items:
mark = item.get_closest_marker(functional_mark)
if not mark:
Expand Down
77 changes: 47 additions & 30 deletions src/xdist/scheduler/customgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def mark_test_complete(
self.check_schedule(node, duration=duration)

def mark_test_pending(self, item: str) -> None:

assert self.collection is not None
self.pending.insert(
0,
Expand All @@ -205,7 +204,9 @@ def remove_pending_tests_from_node(
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
def check_schedule(
self, node: WorkerController, duration: float = 0, from_dsession: bool = False
) -> None:
"""Maybe schedule new items on the node.
If there are any globally pending nodes left then this will
Expand All @@ -214,7 +215,9 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
heuristic to influence how many tests the node is assigned.
"""
if node.shutting_down:
self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down")
self.report_line(
f"[-] [csg] {node.workerinput['workerid']} is already shutting down"
)
return

if self.pending:
Expand All @@ -227,18 +230,25 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
if self.pending_groups:
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
nodes = cycle(self.nodes[0 : dist_group["group_workers"]])
schedule_log: dict[str, Any] = {
n.gateway.id: []
for n in self.nodes[0 : dist_group["group_workers"]]
}
for _ in range(len(dist_group["test_indices"])):
n = next(nodes)
#needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key][
"pending_indices"
][:1]
schedule_log[n.gateway.id].extend(tests_per_node)

self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}")
message = (
f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}"
)
self.report_line(message)

else:
Expand Down Expand Up @@ -310,26 +320,28 @@ def schedule(self) -> None:

if self.is_first_time:
for i, test in enumerate(self.collection):
if '@' in test:
group_mark = test.split('@')[-1]
group_workers = int(group_mark.split('_')[-1])
if "@" in test:
group_mark = test.split("@")[-1]
group_workers = int(group_mark.split("_")[-1])
if group_workers > len(self.nodes):
# We can only distribute across as many nodes as we have available
# If a group requests more, we fallback to our actual max
group_workers = len(self.nodes)
else:
group_mark = 'default'
group_mark = "default"
group_workers = len(self.nodes)
existing_tests = dist_groups.get(group_mark, {}).get('tests', [])
existing_tests = dist_groups.get(group_mark, {}).get("tests", [])
existing_tests.append(test)
existing_indices = dist_groups.get(group_mark, {}).get('test_indices', [])
existing_indices = dist_groups.get(group_mark, {}).get(
"test_indices", []
)
existing_indices.append(i)

dist_groups[group_mark] = {
'tests': existing_tests,
'group_workers': group_workers,
'test_indices': existing_indices,
'pending_indices': existing_indices
"tests": existing_tests,
"group_workers": group_workers,
"test_indices": existing_indices,
"pending_indices": existing_indices,
}
self.dist_groups = dist_groups
self.pending_groups = list(dist_groups.keys())
Expand All @@ -342,17 +354,21 @@ def schedule(self) -> None:
return
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
nodes = cycle(self.nodes[0 : dist_group["group_workers"]])
schedule_log: dict[str, Any] = {
n.gateway.id: [] for n in self.nodes[0 : dist_group["group_workers"]]
}
for _ in range(len(dist_group["test_indices"])):
n = next(nodes)
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:1]
schedule_log[n.gateway.id].extend(tests_per_node)
self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = ("\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}")
message = (
"\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}"
)
self.report_line(message)

def _send_tests(self, node: WorkerController, num: int) -> None:
Expand All @@ -362,16 +378,17 @@ def _send_tests(self, node: WorkerController, num: int) -> None:
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)

def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None:
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num]
def _send_tests_group(
self, node: WorkerController, num: int, dist_group_key: str
) -> None:
tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:num]
if tests_per_node:
del self.dist_groups[dist_group_key]['pending_indices'][:num]
del self.dist_groups[dist_group_key]["pending_indices"][:num]
for test_index in tests_per_node:
self.pending.remove(test_index)
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)


def _check_nodes_have_same_collection(self) -> bool:
"""Return True if all nodes have collected the same items.
Expand Down
2 changes: 1 addition & 1 deletion src/xdist/workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None:
def setup_nodes(
self,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
max_nodes: int | None = None
max_nodes: int | None = None,
) -> list[WorkerController]:
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
Expand Down
14 changes: 14 additions & 0 deletions xdist-testing-ntop/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ def test_1():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_2():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_3():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_4():
time.sleep(2)
assert True


# @pytest.mark.xdist_custom(name="low_4")
# def test_4a():
# time.sleep(2)
Expand All @@ -48,48 +52,58 @@ def test_4():
# time.sleep(2)
# assert True


@pytest.mark.xdist_custom(name="med_2")
def test_5():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_6():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_7():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_8():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="high_1")
def test_9():
time.sleep(5)
assert True


@pytest.mark.xdist_custom(name="high_1")
def test_10():
time.sleep(5)
assert True


def test_11():
time.sleep(1)
assert True


def test_12():
time.sleep(1)
assert True


def test_13():
time.sleep(1)
assert True


def test_14():
time.sleep(1)
assert True

0 comments on commit d62d632

Please sign in to comment.