Skip to content

Commit

Permalink
Merge pull request #11416 from bluetech/fixtures-getfixtureclosure
Browse files Browse the repository at this point in the history
fixtures: more tweaks
  • Loading branch information
RonnyPfannschmidt authored Sep 8, 2023
2 parents e5c81fa + 6ad9499 commit dd7beb3
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 75 deletions.
62 changes: 24 additions & 38 deletions src/_pytest/doctest.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,20 @@ def __init__(
self,
name: str,
parent: "Union[DoctestTextfile, DoctestModule]",
runner: Optional["doctest.DocTestRunner"] = None,
dtest: Optional["doctest.DocTest"] = None,
runner: "doctest.DocTestRunner",
dtest: "doctest.DocTest",
) -> None:
super().__init__(name, parent)
self.runner = runner
self.dtest = dtest

# Stuff needed for fixture support.
self.obj = None
self.fixture_request: Optional[TopRequest] = None
fm = self.session._fixturemanager
fixtureinfo = fm.getfixtureinfo(node=self, func=None, cls=None)
self._fixtureinfo = fixtureinfo
self.fixturenames = fixtureinfo.names_closure
self._initrequest()

@classmethod
def from_parent( # type: ignore
Expand All @@ -277,19 +283,18 @@ def from_parent( # type: ignore
"""The public named constructor."""
return super().from_parent(name=name, parent=parent, runner=runner, dtest=dtest)

def _initrequest(self) -> None:
self.funcargs: Dict[str, object] = {}
self._request = TopRequest(self, _ispytest=True) # type: ignore[arg-type]

def setup(self) -> None:
if self.dtest is not None:
self.fixture_request = _setup_fixtures(self)
globs = dict(getfixture=self.fixture_request.getfixturevalue)
for name, value in self.fixture_request.getfixturevalue(
"doctest_namespace"
).items():
globs[name] = value
self.dtest.globs.update(globs)
self._request._fillfixtures()
globs = dict(getfixture=self._request.getfixturevalue)
for name, value in self._request.getfixturevalue("doctest_namespace").items():
globs[name] = value
self.dtest.globs.update(globs)

def runtest(self) -> None:
assert self.dtest is not None
assert self.runner is not None
_check_all_skipped(self.dtest)
self._disable_output_capturing_for_darwin()
failures: List["doctest.DocTestFailure"] = []
Expand Down Expand Up @@ -376,7 +381,6 @@ def repr_failure( # type: ignore[override]
return ReprFailDoctest(reprlocation_lines)

def reportinfo(self) -> Tuple[Union["os.PathLike[str]", str], Optional[int], str]:
assert self.dtest is not None
return self.path, self.dtest.lineno, "[doctest] %s" % self.name


Expand All @@ -396,17 +400,17 @@ def _get_flag_lookup() -> Dict[str, int]:
)


def get_optionflags(parent):
optionflags_str = parent.config.getini("doctest_optionflags")
def get_optionflags(config: Config) -> int:
optionflags_str = config.getini("doctest_optionflags")
flag_lookup_table = _get_flag_lookup()
flag_acc = 0
for flag in optionflags_str:
flag_acc |= flag_lookup_table[flag]
return flag_acc


def _get_continue_on_failure(config):
continue_on_failure = config.getvalue("doctest_continue_on_failure")
def _get_continue_on_failure(config: Config) -> bool:
continue_on_failure: bool = config.getvalue("doctest_continue_on_failure")
if continue_on_failure:
# We need to turn off this if we use pdb since we should stop at
# the first failure.
Expand All @@ -429,7 +433,7 @@ def collect(self) -> Iterable[DoctestItem]:
name = self.path.name
globs = {"__name__": "__main__"}

optionflags = get_optionflags(self)
optionflags = get_optionflags(self.config)

runner = _get_runner(
verbose=False,
Expand Down Expand Up @@ -574,7 +578,7 @@ def _from_module(self, module, object):
raise
# Uses internal doctest module parsing mechanism.
finder = MockAwareDocTestFinder()
optionflags = get_optionflags(self)
optionflags = get_optionflags(self.config)
runner = _get_runner(
verbose=False,
optionflags=optionflags,
Expand All @@ -589,24 +593,6 @@ def _from_module(self, module, object):
)


def _setup_fixtures(doctest_item: DoctestItem) -> TopRequest:
"""Used by DoctestTextfile and DoctestItem to setup fixture information."""

def func() -> None:
pass

doctest_item.funcargs = {} # type: ignore[attr-defined]
fm = doctest_item.session._fixturemanager
fixtureinfo = fm.getfixtureinfo(
node=doctest_item, func=func, cls=None, funcargs=False
)
doctest_item._fixtureinfo = fixtureinfo # type: ignore[attr-defined]
doctest_item.fixturenames = fixtureinfo.names_closure # type: ignore[attr-defined]
fixture_request = TopRequest(doctest_item, _ispytest=True) # type: ignore[arg-type]
fixture_request._fillfixtures()
return fixture_request


def _init_checker_class() -> Type["doctest.OutputChecker"]:
import doctest
import re
Expand Down
70 changes: 36 additions & 34 deletions src/_pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import deque
from contextlib import suppress
from pathlib import Path
from typing import AbstractSet
from typing import Any
from typing import Callable
from typing import cast
Expand Down Expand Up @@ -1382,7 +1383,7 @@ def pytest_addoption(parser: Parser) -> None:
)


def _get_direct_parametrize_args(node: nodes.Node) -> List[str]:
def _get_direct_parametrize_args(node: nodes.Node) -> Set[str]:
"""Return all direct parametrization arguments of a node, so we don't
mistake them for fixtures.
Expand All @@ -1391,17 +1392,22 @@ def _get_direct_parametrize_args(node: nodes.Node) -> List[str]:
These things are done later as well when dealing with parametrization
so this could be improved.
"""
parametrize_argnames: List[str] = []
parametrize_argnames: Set[str] = set()
for marker in node.iter_markers(name="parametrize"):
if not marker.kwargs.get("indirect", False):
p_argnames, _ = ParameterSet._parse_parametrize_args(
*marker.args, **marker.kwargs
)
parametrize_argnames.extend(p_argnames)

parametrize_argnames.update(p_argnames)
return parametrize_argnames


def deduplicate_names(*seqs: Iterable[str]) -> Tuple[str, ...]:
"""De-duplicate the sequence of names while keeping the original order."""
# Ideally we would use a set, but it does not preserve insertion order.
return tuple(dict.fromkeys(name for seq in seqs for name in seq))


class FixtureManager:
"""pytest fixture definitions and information is stored and managed
from this class.
Expand Down Expand Up @@ -1454,13 +1460,12 @@ def __init__(self, session: "Session") -> None:
def getfixtureinfo(
self,
node: nodes.Item,
func: Callable[..., object],
func: Optional[Callable[..., object]],
cls: Optional[type],
funcargs: bool = True,
) -> FuncFixtureInfo:
"""Calculate the :class:`FuncFixtureInfo` for an item.
If ``funcargs`` is false, or if the item sets an attribute
If ``func`` is None, or if the item sets an attribute
``nofuncargs = True``, then ``func`` is not examined at all.
:param node:
Expand All @@ -1469,21 +1474,23 @@ def getfixtureinfo(
The item's function.
:param cls:
If the function is a method, the method's class.
:param funcargs:
Whether to look into func's parameters as fixture requests.
"""
if funcargs and not getattr(node, "nofuncargs", False):
if func is not None and not getattr(node, "nofuncargs", False):
argnames = getfuncargnames(func, name=node.name, cls=cls)
else:
argnames = ()
usefixturesnames = self._getusefixturesnames(node)
autousenames = self._getautousenames(node.nodeid)
initialnames = deduplicate_names(autousenames, usefixturesnames, argnames)

usefixtures = tuple(
arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args
)
initialnames = usefixtures + argnames
initialnames, names_closure, arg2fixturedefs = self.getfixtureclosure(
initialnames, node, ignore_args=_get_direct_parametrize_args(node)
direct_parametrize_args = _get_direct_parametrize_args(node)

names_closure, arg2fixturedefs = self.getfixtureclosure(
parentnode=node,
initialnames=initialnames,
ignore_args=direct_parametrize_args,
)

return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs)

def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
Expand Down Expand Up @@ -1515,12 +1522,17 @@ def _getautousenames(self, nodeid: str) -> Iterator[str]:
if basenames:
yield from basenames

def _getusefixturesnames(self, node: nodes.Item) -> Iterator[str]:
"""Return the names of usefixtures fixtures applicable to node."""
for mark in node.iter_markers(name="usefixtures"):
yield from mark.args

def getfixtureclosure(
self,
fixturenames: Tuple[str, ...],
parentnode: nodes.Node,
ignore_args: Sequence[str] = (),
) -> Tuple[Tuple[str, ...], List[str], Dict[str, Sequence[FixtureDef[Any]]]]:
initialnames: Tuple[str, ...],
ignore_args: AbstractSet[str],
) -> Tuple[List[str], Dict[str, Sequence[FixtureDef[Any]]]]:
# Collect the closure of all fixtures, starting with the given
# fixturenames as the initial set. As we have to visit all
# factory definitions anyway, we also return an arg2fixturedefs
Expand All @@ -1529,19 +1541,7 @@ def getfixtureclosure(
# (discovering matching fixtures for a given name/node is expensive).

parentid = parentnode.nodeid
fixturenames_closure = list(self._getautousenames(parentid))

def merge(otherlist: Iterable[str]) -> None:
for arg in otherlist:
if arg not in fixturenames_closure:
fixturenames_closure.append(arg)

merge(fixturenames)

# At this point, fixturenames_closure contains what we call "initialnames",
# which is a set of fixturenames the function immediately requests. We
# need to return it as well, so save this.
initialnames = tuple(fixturenames_closure)
fixturenames_closure = list(initialnames)

arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {}
lastlen = -1
Expand All @@ -1555,7 +1555,9 @@ def merge(otherlist: Iterable[str]) -> None:
fixturedefs = self.getfixturedefs(argname, parentid)
if fixturedefs:
arg2fixturedefs[argname] = fixturedefs
merge(fixturedefs[-1].argnames)
for arg in fixturedefs[-1].argnames:
if arg not in fixturenames_closure:
fixturenames_closure.append(arg)

def sort_by_scope(arg_name: str) -> Scope:
try:
Expand All @@ -1566,7 +1568,7 @@ def sort_by_scope(arg_name: str) -> Scope:
return fixturedefs[-1]._scope

fixturenames_closure.sort(key=sort_by_scope, reverse=True)
return initialnames, fixturenames_closure, arg2fixturedefs
return fixturenames_closure, arg2fixturedefs

def pytest_generate_tests(self, metafunc: "Metafunc") -> None:
"""Generate new tests based on parametrized fixtures used by the given metafunc"""
Expand Down
5 changes: 2 additions & 3 deletions src/_pytest/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,9 +1800,8 @@ def __init__(
self.keywords.update(keywords)

if fixtureinfo is None:
fixtureinfo = self.session._fixturemanager.getfixtureinfo(
self, self.obj, self.cls, funcargs=True
)
fm = self.session._fixturemanager
fixtureinfo = fm.getfixtureinfo(self, self.obj, self.cls)
self._fixtureinfo: FuncFixtureInfo = fixtureinfo
self.fixturenames = fixtureinfo.names_closure
self._initrequest()
Expand Down
8 changes: 8 additions & 0 deletions testing/python/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from _pytest.compat import getfuncargnames
from _pytest.config import ExitCode
from _pytest.fixtures import deduplicate_names
from _pytest.fixtures import TopRequest
from _pytest.monkeypatch import MonkeyPatch
from _pytest.pytester import get_public_names
Expand Down Expand Up @@ -4531,3 +4532,10 @@ def test_fixt(custom):
result.assert_outcomes(errors=1)
result.stdout.fnmatch_lines([expected])
assert result.ret == ExitCode.TESTS_FAILED


def test_deduplicate_names() -> None:
items = deduplicate_names("abacd")
assert items == ("a", "b", "c", "d")
items = deduplicate_names(items + ("g", "f", "g", "e", "b"))
assert items == ("a", "b", "c", "d", "g", "f", "e")

0 comments on commit dd7beb3

Please sign in to comment.