diff --git a/_logot_pytest.py b/_logot_pytest.py new file mode 100644 index 00000000..26e8bf34 --- /dev/null +++ b/_logot_pytest.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from collections.abc import Generator +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + # Don't import `logot` until the fixture runs. + # This avoids problem with pytest coverage reporting. + from logot import Logot + + +@pytest.fixture() +def logot() -> Generator[Logot, None, None]: + from logot import Logot + + with Logot().capturing() as logot: + yield logot diff --git a/logot/__init__.py b/logot/__init__.py index 9d48db4f..9b3b47a9 100644 --- a/logot/__init__.py +++ b/logot/__init__.py @@ -1 +1,4 @@ from __future__ import annotations + +from logot._logged import Logged as Logged +from logot._logot import Logot as Logot diff --git a/logot/_logged.py b/logot/_logged.py new file mode 100644 index 00000000..415c3af5 --- /dev/null +++ b/logot/_logged.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod + +from logot._match import compile_matcher +from logot._util import to_levelno + + +class Logged(ABC): + __slots__ = () + + def __rshift__(self, log: Logged) -> Logged: + return _OrderedAllLogged.from_compose(self, log) + + def __and__(self, log: Logged) -> Logged: + return _UnorderedAllLogged.from_compose(self, log) + + def __or__(self, log: Logged) -> Logged: + return _AnyLogged.from_compose(self, log) + + def __str__(self) -> str: + return self._str(indent="") + + @abstractmethod + def __eq__(self, other: object) -> bool: + raise NotImplementedError + + @abstractmethod + def __repr__(self) -> str: + raise NotImplementedError + + @abstractmethod + def _reduce(self, record: logging.LogRecord) -> Logged | None: + raise NotImplementedError + + @abstractmethod + def _str(self, *, indent: str) -> str: + raise NotImplementedError + + +def log(level: int | str, msg: str) -> Logged: + return _LogRecordLogged(to_levelno(level), msg) + + +def debug(msg: str) -> Logged: + return _LogRecordLogged(logging.DEBUG, msg) + + +def info(msg: str) -> Logged: + return _LogRecordLogged(logging.INFO, msg) + + +def warning(msg: str) -> Logged: + return _LogRecordLogged(logging.WARNING, msg) + + +def error(msg: str) -> Logged: + return _LogRecordLogged(logging.ERROR, msg) + + +def critical(msg: str) -> Logged: + return _LogRecordLogged(logging.CRITICAL, msg) + + +class _LogRecordLogged(Logged): + __slots__ = ("_levelno", "_msg", "_matcher") + + def __init__(self, levelno: int, msg: str) -> None: + self._levelno = levelno + self._msg = msg + self._matcher = compile_matcher(msg) + + def __eq__(self, other: object) -> bool: + return isinstance(other, _LogRecordLogged) and other._levelno == self._levelno and other._msg == self._msg + + def __repr__(self) -> str: + return f"log({logging.getLevelName(self._levelno)!r}, {self._msg!r})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + if self._levelno == record.levelno and self._matcher(record.getMessage()): + return None + return self + + def _str(self, *, indent: str) -> str: + return f"[{logging.getLevelName(self._levelno)}] {self._msg}" + + +class _ComposedLogged(Logged): + __slots__ = ("_logs",) + + def __init__(self, logs: tuple[Logged, ...]) -> None: + assert len(logs) > 1 + self._logs = logs + + def __eq__(self, other: object) -> bool: + return isinstance(other, self.__class__) and other._logs == self._logs + + @classmethod + def from_compose(cls, log_a: Logged, log_b: Logged) -> Logged: + # If possible, flatten nested logs of the same type. + if isinstance(log_a, cls): + if isinstance(log_b, cls): + return cls((*log_a._logs, *log_b._logs)) + return cls((*log_a._logs, log_b)) + if isinstance(log_b, cls): + return cls((log_a, *log_b._logs)) + # Wrap the logs without flattening. + return cls((log_a, log_b)) + + @classmethod + def from_reduce(cls, logs: tuple[Logged, ...]) -> Logged | None: + assert logs + # If there is a single log, do not wrap it. + if len(logs) == 1: + return logs[0] + # Wrap the logs. + return cls(logs) + + +class _OrderedAllLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' >> '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + log = self._logs[0] + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return _OrderedAllLogged.from_reduce(self._logs[1:]) + # Handle partial reduction. + if reduced_log is not log: + return _OrderedAllLogged((reduced_log, *self._logs[1:])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + return f"\n{indent}".join(log._str(indent=indent) for log in self._logs) + + +class _UnorderedAllLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' & '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + for n, log in enumerate(self._logs): + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return _UnorderedAllLogged.from_reduce((*self._logs[:n], *self._logs[n + 1 :])) + # Handle partial reduction. + if reduced_log is not log: + return _UnorderedAllLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + nested_indent = indent + " " + logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) + return f"Unordered:{logs_str}" + + +class _AnyLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' | '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + for n, log in enumerate(self._logs): + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return None + # Handle partial reduction. + if reduced_log is not log: + return _AnyLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + nested_indent = indent + " " + logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) + return f"Any:{logs_str}" diff --git a/logot/_logot.py b/logot/_logot.py new file mode 100644 index 00000000..3483b5d1 --- /dev/null +++ b/logot/_logot.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import logging +from collections import deque +from contextlib import AbstractContextManager +from threading import Lock +from types import TracebackType +from typing import ClassVar, TypeVar +from weakref import WeakValueDictionary + +from logot._logged import Logged +from logot._util import to_levelno, to_logger, to_timeout +from logot._waiter import AsyncWaiter, SyncWaiter, Waiter + +W = TypeVar("W", bound=Waiter) + + +class Logot: + __slots__ = ("_timeout", "_lock", "_seen_records", "_queue", "_waiter") + + DEFAULT_LEVEL: ClassVar[int | str] = logging.NOTSET + DEFAULT_LOGGER: ClassVar[logging.Logger | str | None] = None + DEFAULT_TIMEOUT: ClassVar[float] = 3.0 + + def __init__( + self, + *, + timeout: float = DEFAULT_TIMEOUT, + ) -> None: + self._timeout = to_timeout(timeout) + self._lock = Lock() + self._seen_records: WeakValueDictionary[int, logging.LogRecord] = WeakValueDictionary() + self._queue: deque[logging.LogRecord] = deque() + self._waiter: Waiter | None = None + + def capturing( + self, + *, + level: int | str = DEFAULT_LEVEL, + logger: logging.Logger | str | None = DEFAULT_LOGGER, + ) -> AbstractContextManager[Logot]: + levelno = to_levelno(level) + logger = to_logger(logger) + return _Capturing(self, _Handler(self, levelno=levelno), logger=logger) + + def assert_logged(self, log: Logged) -> None: + reduced_log = self._reduce(log) + if reduced_log is not None: + raise AssertionError(f"Not logged:\n\n{reduced_log}") + + def assert_not_logged(self, log: Logged) -> None: + reduced_log = self._reduce(log) + if reduced_log is None: + raise AssertionError(f"Logged:\n\n{log}") + + def wait_for(self, log: Logged, *, timeout: float | None = None) -> None: + waiter = self._open_waiter(log, SyncWaiter, timeout=timeout) + try: + waiter.wait() + finally: + self._close_waiter(waiter) + + async def await_for(self, log: Logged, *, timeout: float | None = None) -> None: + waiter = self._open_waiter(log, AsyncWaiter, timeout=timeout) + try: + await waiter.wait() + finally: + self._close_waiter(waiter) + + def _emit(self, record: logging.LogRecord) -> None: + with self._lock: + # De-duplicate log records. + # Duplicate log records are possible if we have multiple active captures. + record_id = id(record) + if record_id in self._seen_records: # pragma: no cover + return + self._seen_records[record_id] = record + # If there is a waiter that has not been fully reduced, attempt to reduce it. + if self._waiter is not None and self._waiter.log is not None: + self._waiter.log = self._waiter.log._reduce(record) + # If the waiter has fully reduced, notify the blocked caller. + if self._waiter.log is None: + self._waiter.notify() + return + # Otherwise, buffer the log record. + self._queue.append(record) + + def _reduce(self, log: Logged | None) -> Logged | None: + # Drain the queue until the log is fully reduced. + # This does not need a lock, since `deque.popleft()` is thread-safe. + while self._queue and log is not None: + log = log._reduce(self._queue.popleft()) + # All done! + return log + + def _open_waiter(self, log: Logged, waiter_cls: type[W], *, timeout: float | None) -> W: + with self._lock: + # If no timeout is provided, use the default timeout. + # Otherwise, validate and use the provided timeout. + if timeout is None: + timeout = self._timeout + else: + timeout = to_timeout(timeout) + # Ensure no other waiters. + if self._waiter is not None: # pragma: no cover + raise RuntimeError("Multiple waiters are not supported") + # Set a waiter. + waiter = self._waiter = waiter_cls(log, timeout=timeout) + # Apply an immediate reduction. + waiter.log = self._reduce(waiter.log) + if waiter.log is None: + waiter.notify() + # All done! + return waiter + + def _close_waiter(self, waiter: Waiter) -> None: + with self._lock: + # Clear the waiter. + self._waiter = None + # Error if the waiter logs are not fully reduced. + if waiter.log is not None: + raise AssertionError(f"Not logged:\n\n{waiter.log}") + + +class _Capturing: + __slots__ = ("_logot", "_handler", "_logger", "_prev_levelno") + + def __init__(self, logot: Logot, handler: logging.Handler, *, logger: logging.Logger) -> None: + self._logot = logot + self._handler = handler + self._logger = logger + + def __enter__(self) -> Logot: + # If the logger is less verbose than the handler, force it to the necessary verboseness. + self._prev_levelno = self._logger.level + if self._handler.level < self._logger.level: + self._logger.setLevel(self._handler.level) + # Add the handler. + self._logger.addHandler(self._handler) + return self._logot + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + # Remove the handler. + self._logger.removeHandler(self._handler) + # Restore the previous level. + self._logger.setLevel(self._prev_levelno) + + +class _Handler(logging.Handler): + __slots__ = ("_logot",) + + def __init__(self, logot: Logot, *, levelno: int) -> None: + super().__init__(levelno) + self._logot = logot + + def emit(self, record: logging.LogRecord) -> None: + self._logot._emit(record) diff --git a/logot/_match.py b/logot/_match.py index 4e0694ee..c60a81e7 100644 --- a/logot/_match.py +++ b/logot/_match.py @@ -1,11 +1,12 @@ from __future__ import annotations import re -from functools import partial from typing import Callable # Compiled matcher callable. -Matcher = Callable[[str], bool] +# The returned `object` is truthy on successful match and falsy on failed match. +# TODO: Use `TypeAlias` when we only need to support Python 3.10+. +Matcher = Callable[[str], object] # Regex matching a simplified conversion specifier. _RE_CONVERSION = re.compile(r"%(.|$)") @@ -39,10 +40,6 @@ } -def _match_regex(pattern: re.Pattern[str], msg: str) -> bool: - return pattern.fullmatch(msg) is not None - - def compile_matcher(pattern: str) -> Matcher: parts: list[str] = _RE_CONVERSION.split(pattern) parts_len = len(parts) @@ -62,7 +59,7 @@ def compile_matcher(pattern: str) -> Matcher: # Create regex matcher. if is_regex: parts[::2] = map(re.escape, parts[::2]) - return partial(_match_regex, re.compile("".join(parts), re.DOTALL)) + return re.compile("".join(parts), re.DOTALL).fullmatch # Recreate the pattern with all escape sequences replaced. pattern = "".join(parts) # Create simple matcher. diff --git a/logot/_util.py b/logot/_util.py index 0ec89239..e1526fe1 100644 --- a/logot/_util.py +++ b/logot/_util.py @@ -15,5 +15,26 @@ def to_levelno(level: int | str) -> int: if not isinstance(levelno, int): raise ValueError(f"Unknown level: {level!r}") return levelno - # Fail on other types. + # Handle invalid level. raise TypeError(f"Invalid level: {level!r}") + + +def to_logger(logger: logging.Logger | str | None) -> logging.Logger: + # Handle `None` or `str` logger. + if logger is None or isinstance(logger, str): + return logging.getLogger(logger) + # Handle `Logger` logger. + if isinstance(logger, logging.Logger): + return logger + # Handle invalid logger. + raise TypeError(f"Invalid logger: {logger!r}") + + +def to_timeout(timeout: float) -> float: + # Handle numeric timeout. + if isinstance(timeout, (float, int)): + if timeout >= 0.0: + return float(timeout) + raise ValueError(f"Invalid timeout: {timeout!r}") + # Handle invalid timeout. + raise TypeError(f"Invalid timeout: {timeout!r}") diff --git a/logot/_waiter.py b/logot/_waiter.py new file mode 100644 index 00000000..5cf950f7 --- /dev/null +++ b/logot/_waiter.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import asyncio +from abc import ABC, abstractmethod +from threading import Lock + +from logot._logged import Logged + + +class Waiter(ABC): + __slots__ = ("log", "_timeout") + + def __init__(self, log: Logged, *, timeout: float) -> None: + self.log: Logged | None = log + self._timeout = timeout + + @abstractmethod + def notify(self) -> None: + raise NotImplementedError + + @abstractmethod + def wait(self) -> object: + raise NotImplementedError + + +class SyncWaiter(Waiter): + __slots__ = ("_lock",) + + def __init__(self, log: Logged, *, timeout: float) -> None: + super().__init__(log, timeout=timeout) + # Create an already-acquired lock. This will be released by `notify()`. + self._lock = Lock() + self._lock.acquire() + + def notify(self) -> None: + self._lock.release() + + def wait(self) -> None: + self._lock.acquire(timeout=self._timeout) + + +class AsyncWaiter(Waiter): + __slots__ = ("_loop", "_future") + + def __init__(self, log: Logged, *, timeout: float) -> None: + super().__init__(log, timeout=timeout) + # Create an unresolved `asyncio.Future`. This will be resolved by `notify()`. + self._loop = asyncio.get_running_loop() + self._future: asyncio.Future[None] = self._loop.create_future() + + def notify(self) -> None: + self._loop.call_soon_threadsafe(self._resolve) + + async def wait(self) -> None: + timer = self._loop.call_later(self._timeout, self._resolve) + try: + await self._future + finally: + timer.cancel() + + def _resolve(self) -> None: + try: + self._future.set_result(None) + except asyncio.InvalidStateError: # pragma: no cover + # It's possible that the timeout and the `notify()` will both occur in the same tick of the event loop. + pass diff --git a/logot/logged.py b/logot/logged.py index 4a7834ca..626aca02 100644 --- a/logot/logged.py +++ b/logot/logged.py @@ -1,188 +1,8 @@ from __future__ import annotations -import logging -from abc import ABC, abstractmethod - -from logot._match import compile_matcher -from logot._util import to_levelno - - -class Logged(ABC): - __slots__ = () - - def __rshift__(self, log: Logged) -> Logged: - return _OrderedAllLogged.from_compose(self, log) - - def __and__(self, log: Logged) -> Logged: - return _UnorderedAllLogged.from_compose(self, log) - - def __or__(self, log: Logged) -> Logged: - return _AnyLogged.from_compose(self, log) - - def __str__(self) -> str: - return self._str(indent="") - - @abstractmethod - def __eq__(self, other: object) -> bool: - raise NotImplementedError - - @abstractmethod - def __repr__(self) -> str: - raise NotImplementedError - - @abstractmethod - def _reduce(self, record: logging.LogRecord) -> Logged | None: - raise NotImplementedError - - @abstractmethod - def _str(self, *, indent: str) -> str: - raise NotImplementedError - - -class _LogRecordLogged(Logged): - __slots__ = ("_levelno", "_msg", "_matcher") - - def __init__(self, levelno: int, msg: str) -> None: - self._levelno = levelno - self._msg = msg - self._matcher = compile_matcher(msg) - - def __eq__(self, other: object) -> bool: - return isinstance(other, _LogRecordLogged) and other._levelno == self._levelno and other._msg == self._msg - - def __repr__(self) -> str: - return f"log({logging.getLevelName(self._levelno)!r}, {self._msg!r})" - - def _reduce(self, record: logging.LogRecord) -> Logged | None: - if self._levelno == record.levelno and self._matcher(record.getMessage()): - return None - return self - - def _str(self, *, indent: str) -> str: - return f"[{logging.getLevelName(self._levelno)}] {self._msg}" - - -def log(level: int | str, msg: str) -> Logged: - return _LogRecordLogged(to_levelno(level), msg) - - -def debug(msg: str) -> Logged: - return _LogRecordLogged(logging.DEBUG, msg) - - -def info(msg: str) -> Logged: - return _LogRecordLogged(logging.INFO, msg) - - -def warning(msg: str) -> Logged: - return _LogRecordLogged(logging.WARNING, msg) - - -def error(msg: str) -> Logged: - return _LogRecordLogged(logging.ERROR, msg) - - -def critical(msg: str) -> Logged: - return _LogRecordLogged(logging.CRITICAL, msg) - - -class _ComposedLogged(Logged): - __slots__ = ("_logs",) - - def __init__(self, logs: tuple[Logged, ...]) -> None: - assert len(logs) > 1, "Unreachable" - self._logs = logs - - def __eq__(self, other: object) -> bool: - return isinstance(other, self.__class__) and other._logs == self._logs - - @classmethod - def from_compose(cls, log_a: Logged, log_b: Logged) -> Logged: - # If possible, flatten nested logs of the same type. - if isinstance(log_a, cls): - if isinstance(log_b, cls): - return cls((*log_a._logs, *log_b._logs)) - return cls((*log_a._logs, log_b)) - if isinstance(log_b, cls): - return cls((log_a, *log_b._logs)) - # Wrap the logs without flattening. - return cls((log_a, log_b)) - - @classmethod - def from_reduce(cls, logs: tuple[Logged, ...]) -> Logged | None: - assert logs, "Unreachable" - # If there is a single log, do not wrap it. - if len(logs) == 1: - return logs[0] - # Wrap the logs. - return cls(logs) - - -class _OrderedAllLogged(_ComposedLogged): - __slots__ = () - - def __repr__(self) -> str: - return f"({' >> '.join(map(repr, self._logs))})" - - def _reduce(self, record: logging.LogRecord) -> Logged | None: - log = self._logs[0] - reduced_log = log._reduce(record) - # Handle full reduction. - if reduced_log is None: - return _OrderedAllLogged.from_reduce(self._logs[1:]) - # Handle partial reduction. - if reduced_log is not log: - return _OrderedAllLogged((reduced_log, *self._logs[1:])) - # Handle no reduction. - return self - - def _str(self, *, indent: str) -> str: - return f"\n{indent}".join(log._str(indent=indent) for log in self._logs) - - -class _UnorderedAllLogged(_ComposedLogged): - __slots__ = () - - def __repr__(self) -> str: - return f"({' & '.join(map(repr, self._logs))})" - - def _reduce(self, record: logging.LogRecord) -> Logged | None: - for n, log in enumerate(self._logs): - reduced_log = log._reduce(record) - # Handle full reduction. - if reduced_log is None: - return _UnorderedAllLogged.from_reduce((*self._logs[:n], *self._logs[n + 1 :])) - # Handle partial reduction. - if reduced_log is not log: - return _UnorderedAllLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) - # Handle no reduction. - return self - - def _str(self, *, indent: str) -> str: - nested_indent = indent + " " - logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) - return f"Unordered:{logs_str}" - - -class _AnyLogged(_ComposedLogged): - __slots__ = () - - def __repr__(self) -> str: - return f"({' | '.join(map(repr, self._logs))})" - - def _reduce(self, record: logging.LogRecord) -> Logged | None: - for n, log in enumerate(self._logs): - reduced_log = log._reduce(record) - # Handle full reduction. - if reduced_log is None: - return None - # Handle partial reduction. - if reduced_log is not log: - return _AnyLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) - # Handle no reduction. - return self - - def _str(self, *, indent: str) -> str: - nested_indent = indent + " " - logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) - return f"Any:{logs_str}" +from logot._logged import critical as critical +from logot._logged import debug as debug +from logot._logged import error as error +from logot._logged import info as info +from logot._logged import log as log +from logot._logged import warning as warning diff --git a/poetry.lock b/poetry.lock index 73d9afa7..f6dd288a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -297,13 +297,13 @@ sphinx-basic-ng = "*" [[package]] name = "hypothesis" -version = "6.96.2" +version = "6.96.3" description = "A library for property-based testing" optional = false python-versions = ">=3.8" files = [ - {file = "hypothesis-6.96.2-py3-none-any.whl", hash = "sha256:34b5fb4c487f159083fa5db186b4357b42a27e597795e9cbccead8cde0242060"}, - {file = "hypothesis-6.96.2.tar.gz", hash = "sha256:524a0ac22c8dfff640f21f496b85ee193a470e8570ab7707b8e3bfccd7da34a6"}, + {file = "hypothesis-6.96.3-py3-none-any.whl", hash = "sha256:f3b7026d1f376933141519bf66f445691ede19fb842555c092d8a7dcb08545ef"}, + {file = "hypothesis-6.96.3.tar.gz", hash = "sha256:96684a988f35c4d308cbedb841cabc8a63caa0aa612b2221651c5fb3d0d277e2"}, ] [package.dependencies] @@ -582,6 +582,24 @@ tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} [package.extras] testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "0.23.3" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-asyncio-0.23.3.tar.gz", hash = "sha256:af313ce900a62fbe2b1aed18e37ad757f1ef9940c6b6a88e2954de38d6b1fb9f"}, + {file = "pytest_asyncio-0.23.3-py3-none-any.whl", hash = "sha256:37a9d912e8338ee7b4a3e917381d1c95bfc8682048cb0fbc35baba316ec1faba"}, +] + +[package.dependencies] +pytest = ">=7.0.0" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "pytest-cov" version = "4.1.0" @@ -919,4 +937,4 @@ pytest = ["pytest"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "d0336ec998244915ebd3607918ac9d7d7af70ad05338df4ddcc7a579c3e6d863" +content-hash = "b4c713ba4144b9b7ff916e6c6f6747e434651cc4126c82a8c17bbbc3b2223dbb" diff --git a/pyproject.toml b/pyproject.toml index 379317fe..01624c44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "logot" -version = "0.0.1a4" +version = "0.0.1a5" description = "Log-based testing" authors = ["Dave Hall "] license = "MIT" @@ -15,7 +15,7 @@ classifiers = [ "Topic :: Software Development :: Testing :: Unit", "Topic :: System :: Logging", ] -packages = [{ include = "logot" }] +packages = [{ include = "logot" }, { include = "_logot_pytest.py" }] [tool.poetry.dependencies] python = "^3.8" @@ -29,6 +29,7 @@ hypothesis = "^6.96.1" mypy = "^1.8.0" ruff = "^0.1.11" pytest = "*" +pytest-asyncio = "^0.23.3" pytest-cov = "^4.1.0" [tool.poetry.group.docs] @@ -39,6 +40,9 @@ furo = { version = "*", markers = "python_version >= '3.12'" } sphinx = { version = "7.2.6", markers = "python_version >= '3.12'" } sphinx-autobuild = { version = "*", markers = "python_version >= '3.12'" } +[tool.poetry.plugins.pytest11] +logot = "_logot_pytest" + [tool.coverage.run] source = ["logot", "tests"] @@ -59,6 +63,7 @@ strict = true testpaths = ["tests"] console_output_style = "classic" addopts = "--tb=native --cov" +asyncio_mode = "auto" [tool.ruff] include = ["docs/**/*.py", "logot/**/*.py", "tests/**/*.py"] diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..9c2582e5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import logging +import threading +from collections.abc import Generator +from contextlib import contextmanager +from time import sleep + +logger = logging.getLogger("logot") + + +def lines(*lines: str) -> str: + return "\n".join(lines) + + +@contextmanager +def log_soon(level: int, msg: str) -> Generator[None, None, None]: + thread = threading.Thread(target=_log_soon, args=(level, msg), daemon=True) + thread.start() + try: + yield + finally: + thread.join() + + +def _log_soon(level: int, msg: str) -> None: + sleep(0.1) + logger.log(level, msg) diff --git a/tests/test_logged.py b/tests/test_logged.py index 7f2464be..8ad7bb7d 100644 --- a/tests/test_logged.py +++ b/tests/test_logged.py @@ -2,14 +2,15 @@ import logging -from logot import logged +from logot import Logged, logged +from tests import lines def record(level: int, msg: str) -> logging.LogRecord: return logging.LogRecord(name="logot", level=level, pathname=__file__, lineno=0, msg=msg, args=(), exc_info=None) -def assert_reduce(log: logged.Logged | None, *records: logging.LogRecord) -> None: +def assert_reduce(log: Logged | None, *records: logging.LogRecord) -> None: for record in records: # The `Logged` should not have been fully reduced. assert log is not None @@ -86,11 +87,9 @@ def test_ordered_all_logged_repr() -> None: def test_ordered_all_logged_str() -> None: - assert str(logged.info("foo") >> logged.info("bar")) == "\n".join( - ( - "[INFO] foo", - "[INFO] bar", - ) + assert str(logged.info("foo") >> logged.info("bar")) == lines( + "[INFO] foo", + "[INFO] bar", ) # Indentation is sane with multiple nested composed `Logged`. assert str( @@ -99,21 +98,19 @@ def test_ordered_all_logged_str() -> None: (logged.info("bar1a") | logged.info("bar1b")) & ((logged.info("bar2a1") >> logged.info("bar2a2")) | (logged.info("bar2b1") >> logged.info("bar2b2"))) ) - ) == "\n".join( - ( - "Unordered:", - "- [INFO] foo1", - "- [INFO] foo2", - "Unordered:", - "- Any:", - " - [INFO] bar1a", - " - [INFO] bar1b", - "- Any:", - " - [INFO] bar2a1", - " [INFO] bar2a2", - " - [INFO] bar2b1", - " [INFO] bar2b2", - ) + ) == lines( + "Unordered:", + "- [INFO] foo1", + "- [INFO] foo2", + "Unordered:", + "- Any:", + " - [INFO] bar1a", + " - [INFO] bar1b", + "- Any:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", ) @@ -170,12 +167,10 @@ def test_unordered_all_logged_repr() -> None: def test_unordered_all_logged_str() -> None: - assert str(logged.info("foo") & logged.info("bar")) == "\n".join( - ( - "Unordered:", - "- [INFO] foo", - "- [INFO] bar", - ) + assert str(logged.info("foo") & logged.info("bar")) == lines( + "Unordered:", + "- [INFO] foo", + "- [INFO] bar", ) # Indentation is sane with multiple nested composed `Logged`. assert str( @@ -184,20 +179,18 @@ def test_unordered_all_logged_str() -> None: (logged.info("bar1a") | logged.info("bar1b")) >> ((logged.info("bar2a1") >> logged.info("bar2a2")) | (logged.info("bar2b1") >> logged.info("bar2b2"))) ) - ) == "\n".join( - ( - "Unordered:", - "- [INFO] foo1", - " [INFO] foo2", - "- Any:", - " - [INFO] bar1a", - " - [INFO] bar1b", - " Any:", - " - [INFO] bar2a1", - " [INFO] bar2a2", - " - [INFO] bar2b1", - " [INFO] bar2b2", - ) + ) == lines( + "Unordered:", + "- [INFO] foo1", + " [INFO] foo2", + "- Any:", + " - [INFO] bar1a", + " - [INFO] bar1b", + " Any:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", ) @@ -252,12 +245,10 @@ def test_any_logged_repr() -> None: def test_any_logged_str() -> None: - assert str(logged.info("foo") | logged.info("bar")) == "\n".join( - ( - "Any:", - "- [INFO] foo", - "- [INFO] bar", - ) + assert str(logged.info("foo") | logged.info("bar")) == lines( + "Any:", + "- [INFO] foo", + "- [INFO] bar", ) # Indentation is sane with multiple nested composed `Logged`. assert str( @@ -266,20 +257,18 @@ def test_any_logged_str() -> None: (logged.info("bar1a") & logged.info("bar1b")) >> ((logged.info("bar2a1") >> logged.info("bar2a2")) & (logged.info("bar2b1") >> logged.info("bar2b2"))) ) - ) == "\n".join( - ( - "Any:", - "- [INFO] foo1", - " [INFO] foo2", - "- Unordered:", - " - [INFO] bar1a", - " - [INFO] bar1b", - " Unordered:", - " - [INFO] bar2a1", - " [INFO] bar2a2", - " - [INFO] bar2b1", - " [INFO] bar2b2", - ) + ) == lines( + "Any:", + "- [INFO] foo1", + " [INFO] foo2", + "- Unordered:", + " - [INFO] bar1a", + " - [INFO] bar1b", + " Unordered:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", ) diff --git a/tests/test_logot.py b/tests/test_logot.py new file mode 100644 index 00000000..2814d57f --- /dev/null +++ b/tests/test_logot.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import logging + +import pytest + +from logot import Logot, logged +from tests import lines, log_soon, logger + + +def test_capturing() -> None: + assert logger.level == logging.NOTSET + # Set a fairly non-verbose log level. + logger.setLevel(logging.WARNING) + try: + with Logot().capturing(level=logging.DEBUG, logger=logger) as logot: + assert isinstance(logot, Logot) + # The logger will have been overridden for the required verbosity. + assert logger.level == logging.DEBUG + # When the capture ends, the logging verbosity is restored. + assert logger.level == logging.WARNING + finally: + # Whatever this test does, reset the logger to what it was! + logger.setLevel(logging.NOTSET) + + +def test_assert_logged_pass(logot: Logot) -> None: + logger.info("foo bar") + logot.assert_logged(logged.info("foo bar")) + + +def test_assert_logged_fail(logot: Logot) -> None: + logger.info("boom!") + with pytest.raises(AssertionError) as ex: + logot.assert_logged(logged.info("foo bar")) + assert str(ex.value) == lines( + "Not logged:", + "", + "[INFO] foo bar", + ) + + +def test_assert_not_logged_pass(logot: Logot) -> None: + logger.info("boom!") + logot.assert_not_logged(logged.info("foo bar")) + + +def test_assert_not_logged_fail(logot: Logot) -> None: + logger.info("foo bar") + with pytest.raises(AssertionError) as ex: + logot.assert_not_logged(logged.info("foo bar")) + assert str(ex.value) == lines( + "Logged:", + "", + "[INFO] foo bar", + ) + + +def test_wait_for_pass_immediate(logot: Logot) -> None: + logger.info("foo bar") + logot.wait_for(logged.info("foo bar")) + + +def test_wait_for_pass_soon(logot: Logot) -> None: + with log_soon(logging.INFO, "foo bar"): + logot.wait_for(logged.info("foo bar")) + + +def test_wait_for_fail(logot: Logot) -> None: + logger.info("boom!") + with pytest.raises(AssertionError) as ex: + logot.wait_for(logged.info("foo bar"), timeout=0.1) + assert str(ex.value) == lines( + "Not logged:", + "", + "[INFO] foo bar", + ) + + +async def test_await_for_pass_immediate(logot: Logot) -> None: + logger.info("foo bar") + await logot.await_for(logged.info("foo bar")) + + +async def test_await_for_pass_soon(logot: Logot) -> None: + with log_soon(logging.INFO, "foo bar"): + await logot.await_for(logged.info("foo bar")) + + +async def test_await_for_fail(logot: Logot) -> None: + logger.info("boom!") + with pytest.raises(AssertionError) as ex: + await logot.await_for(logged.info("foo bar"), timeout=0.1) + assert str(ex.value) == lines( + "Not logged:", + "", + "[INFO] foo bar", + ) diff --git a/tests/test_util.py b/tests/test_util.py index 6534f0a7..02d2e836 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -5,7 +5,8 @@ import pytest -from logot._util import to_levelno +from logot._util import to_levelno, to_logger, to_timeout +from tests import logger def test_to_levelno_int_pass() -> None: @@ -32,3 +33,37 @@ def test_to_levelno_type_fail() -> None: with pytest.raises(TypeError) as ex: to_levelno(cast(int, 1.5)) assert str(ex.value) == "Invalid level: 1.5" + + +def test_to_logger_none_pass() -> None: + assert to_logger(None) is logging.getLogger() + + +def test_to_logger_str_pass() -> None: + assert to_logger("logot") is logger + + +def test_to_logger_logger_pass() -> None: + assert to_logger(logging.getLogger("logot")) is logger + + +def test_to_logger_type_fail() -> None: + with pytest.raises(TypeError) as ex: + to_logger(cast(str, 1.5)) + assert str(ex.value) == "Invalid logger: 1.5" + + +def test_to_timeout_numeric_pass() -> None: + assert to_timeout(1.0) == 1.0 + + +def test_to_timeout_numeric_fail() -> None: + with pytest.raises(ValueError) as ex: + to_timeout(-1.0) + assert str(ex.value) == "Invalid timeout: -1.0" + + +def test_to_timeout_type_fail() -> None: + with pytest.raises(TypeError) as ex: + to_timeout(cast(float, "boom!")) + assert str(ex.value) == "Invalid timeout: 'boom!'"