Skip to content

Commit

Permalink
Added a fixtures that checks if tasks are still running in tests (#538)
Browse files Browse the repository at this point in the history
added autorun fixtures that check if tasks are still running and fail on the test level

also corrected some tests with incorrect event-loop usage
  • Loading branch information
evalott100 authored Sep 17, 2024
1 parent 5b67581 commit 989beeb
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/ophyd_async/core/_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async def _check_config_sigs(self):
@AsyncStatus.wrap
async def unstage(self) -> None:
# Stop data writing.
await self.writer.close()
await asyncio.gather(self.writer.close(), self.controller.disarm())

async def read_configuration(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(sig.read() for sig in self._config_sigs)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
from typing import Optional

from pydantic import Field

from ophyd_async.core import DetectorControl, PathProvider
from ophyd_async.core._detector import TriggerInfo

Expand All @@ -14,7 +12,7 @@ def __init__(
self,
pattern_generator: PatternGenerator,
path_provider: PathProvider,
exposure: float = Field(default=0.1),
exposure: Optional[float] = 0.1,
) -> None:
self.pattern_generator: PatternGenerator = pattern_generator
self.pattern_generator.set_exposure(exposure)
Expand Down Expand Up @@ -46,13 +44,13 @@ async def wait_for_idle(self):
await self.task

async def disarm(self):
if self.task:
if self.task and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.task = None
self.task = None

def get_deadtime(self, exposure: float | None) -> float:
return 0.001
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ async def collect_stream_docs(
def close(self) -> None:
if self._handle_for_h5_file:
self._handle_for_h5_file.close()
print("file closed")
self._handle_for_h5_file = None

async def observe_indices_written(
Expand Down
72 changes: 70 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import pprint
import subprocess
import sys
import time
Expand All @@ -8,6 +9,7 @@

import pytest
from bluesky.run_engine import RunEngine, TransitionError
from pytest import FixtureRequest

from ophyd_async.core import (
DetectorTrigger,
Expand Down Expand Up @@ -58,21 +60,87 @@ def configure_epics_environment():
os.environ["EPICS_PVA_AUTO_ADDR_LIST"] = "NO"


_ALLOWED_PYTEST_TASKS = {"async_finalizer", "async_setup", "async_teardown"}


def _error_and_kill_pending_tasks(
loop: asyncio.AbstractEventLoop, test_name: str, test_passed: bool
) -> set[asyncio.Task]:
"""Cancels pending tasks in the event loop for a test. Raises an exception if
the test hasn't already.
Args:
loop: The event loop to check for pending tasks.
test_name: The name of the test.
test_passed: Indicates whether the test passed.
Returns:
set[asyncio.Task]: The set of unfinished tasks that were cancelled.
Raises:
RuntimeError: If there are unfinished tasks and the test didn't fail.
"""
unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_PYTEST_TASKS and not task.done()
}
for task in unfinished_tasks:
task.cancel()

# We only raise an exception here if the test didn't fail anyway.
# If it did then it makes sense that there's some tasks we need to cancel,
# but an exception will already have been raised.
if unfinished_tasks and test_passed:
raise RuntimeError(
f"Not all tasks closed during test {test_name}:\n"
f"{pprint.pformat(unfinished_tasks, width=88)}"
)

return unfinished_tasks


@pytest.fixture(autouse=True, scope="function")
def fail_test_on_unclosed_tasks(request: FixtureRequest):
"""
Used on every test to ensure failure if there are pending tasks
by the end of the test.
"""

fail_count = request.session.testsfailed
loop = asyncio.get_event_loop()
loop.set_debug(True)

request.addfinalizer(
lambda: _error_and_kill_pending_tasks(
loop, request.node.name, request.session.testsfailed == fail_count
)
)


@pytest.fixture(scope="function")
def RE(request):
def RE(request: FixtureRequest):
loop = asyncio.new_event_loop()
loop.set_debug(True)
RE = RunEngine({}, call_returns_result=True, loop=loop)
fail_count = request.session.testsfailed

def clean_event_loop():
if RE.state not in ("idle", "panicked"):
try:
RE.halt()
except TransitionError:
pass

loop.call_soon_threadsafe(loop.stop)
RE._th.join()
loop.close()

try:
_error_and_kill_pending_tasks(
loop, request.node.name, request.session.testsfailed == fail_count
)
finally:
loop.close()

request.addfinalizer(clean_event_loop)
return RE
Expand Down
15 changes: 12 additions & 3 deletions tests/core/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,18 @@ async def test_status_propogates_traceback_under_RE(RE) -> None:


async def test_async_status_exception_timeout():
st = AsyncStatus(asyncio.sleep(0.1))
with pytest.raises(Exception):
st.exception(timeout=1.0)
try:
st = AsyncStatus(asyncio.sleep(0.1))
with pytest.raises(
ValueError,
match=(
"cannot honour any timeout other than 0 in an asynchronous function"
),
):
st.exception(timeout=1.0)
finally:
if not st.done:
st.task.cancel()


@pytest.fixture
Expand Down
44 changes: 25 additions & 19 deletions tests/epics/adcore/test_single_trigger.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,48 @@
import bluesky.plan_stubs as bps
import bluesky.plans as bp
import pytest
from bluesky import RunEngine

from ophyd_async.core import DeviceCollector, set_mock_value
import ophyd_async.plan_stubs as ops
from ophyd_async.epics import adcore


@pytest.fixture
async def single_trigger_det():
async with DeviceCollector(mock=True):
stats = adcore.NDPluginStatsIO("PREFIX:STATS")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
)
async def single_trigger_det_with_stats():
stats = adcore.NDPluginStatsIO("PREFIX:STATS", name="stats")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
name="det",
)

assert det.name == "det"
assert stats.name == "det-stats"
# Set non-default values to check they are set back
# These are using set_mock_value to simulate the backend IOC being setup
# in a particular way, rather than values being set by the Ophyd signals
set_mock_value(det.drv.acquire_time, 0.5)
set_mock_value(det.drv.array_counter, 1)
set_mock_value(det.drv.image_mode, adcore.ImageMode.continuous)
set_mock_value(stats.unique_id, 3)
yield det
yield det, stats


async def test_single_trigger_det(
single_trigger_det: adcore.SingleTriggerDetector, RE: RunEngine
single_trigger_det_with_stats: adcore.SingleTriggerDetector, RE: RunEngine
):
single_trigger_det, stats = single_trigger_det_with_stats
names = []
docs = []
RE.subscribe(lambda name, _: names.append(name))
RE.subscribe(lambda _, doc: docs.append(doc))

RE(bp.count([single_trigger_det]))
def plan():
yield from ops.ensure_connected(single_trigger_det, mock=True)
yield from bps.abs_set(single_trigger_det.drv.acquire_time, 0.5)
yield from bps.abs_set(single_trigger_det.drv.array_counter, 1)
yield from bps.abs_set(
single_trigger_det.drv.image_mode, adcore.ImageMode.continuous
)
# set_mock_value(stats.unique_id, 3)
yield from bp.count([single_trigger_det])

RE(plan())

drv = single_trigger_det.drv
assert 1 == await drv.acquire.get_value()
Expand All @@ -47,4 +53,4 @@ async def test_single_trigger_det(
_, descriptor, event, _ = docs
assert descriptor["configuration"]["det"]["data"]["det-drv-acquire_time"] == 0.5
assert event["data"]["det-drv-array_counter"] == 1
assert event["data"]["det-stats-unique_id"] == 3
assert event["data"]["det-stats-unique_id"] == 0
Loading

0 comments on commit 989beeb

Please sign in to comment.