Skip to content

Commit

Permalink
Begin writing tests for worker
Browse files Browse the repository at this point in the history
  • Loading branch information
callumforrester committed Apr 28, 2023
1 parent d908fe5 commit dfb159f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/blueapi/worker/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ def _run_worker_thread(worker: Worker[T]) -> None:

LOGGER.info("Worker starting")
# TODO: Use API methods only!
worker._run_forever()
worker.run()
2 changes: 1 addition & 1 deletion src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def stop(self) -> None:
self._task_queue.put(KillSignal())
self._stopped.wait(timeout=30.0)

def _run_forever(self) -> None:
def run(self) -> None:
LOGGER.info("Worker starting")
self._ctx.run_engine.state_hook = self._on_state_change
self._ctx.run_engine.subscribe(self._on_document)
Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ def start(self) -> None:
"""

@abstractmethod
def stop(self) -> None:
def run(self) -> None:
"""
Stop worker
Run worker, blocks
"""

@abstractmethod
def block(self) -> None:
def stop(self) -> None:
"""
Block while worker is running
Stop worker
"""

@property
Expand Down
43 changes: 43 additions & 0 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from concurrent.futures import Future
from threading import Event

import pytest

from blueapi.core import BlueskyContext
from blueapi.worker import RunEngineWorker, RunPlan, Worker, WorkerEvent, WorkerState


@pytest.fixture
def context() -> BlueskyContext:
ctx = BlueskyContext()
ctx.with_startup_script("blueapi.startup.example")
return ctx


@pytest.fixture
def worker(context: BlueskyContext) -> Worker:
worker = RunEngineWorker(context)
yield worker
worker.stop()


def test_stop(worker: Worker) -> None:
worker.start()


def test_submit(worker: Worker) -> None:
worker.start()
started = Future()

def process_event(event: WorkerEvent, task_id: str) -> None:
started.set_result(event)

worker.worker_events.subscribe(process_event)
worker.submit_task(
"test",
RunPlan(
name="sleep",
params={"time": 0.1},
),
)
assert started.result(timeout=5.0).status.state is WorkerState.RUNNING

0 comments on commit dfb159f

Please sign in to comment.