Skip to content

Commit

Permalink
Fix race condition on worker stop/start (#255)
Browse files Browse the repository at this point in the history
The worker ensures that `worker.start()` blocks until the worker is
ready to execute a task and `worker.stop()` blocks until the worker is
no longer able to execute a task. It does so using a group of
`threading.Event`s.

There were intermittent issues when events were not being cleaned up
correctly, so when the worker was restarted it was not in the correct
state. This PR fixes those and cleans up some of the thread control
code.
  • Loading branch information
callumforrester authored May 31, 2023
1 parent b78faf7 commit d0ffaaf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
32 changes: 23 additions & 9 deletions src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

LOGGER = logging.getLogger(__name__)

DEFAULT_STOP_TIMEOUT: float = 30.0
DEFAULT_START_STOP_TIMEOUT: float = 30.0


class RunEngineWorker(Worker[Task]):
Expand All @@ -45,7 +45,7 @@ class RunEngineWorker(Worker[Task]):
"""

_ctx: BlueskyContext
_stop_timeout: float
_start_stop_timeout: float

_pending_tasks: Dict[str, TrackableTask]

Expand All @@ -67,10 +67,10 @@ class RunEngineWorker(Worker[Task]):
def __init__(
self,
ctx: BlueskyContext,
stop_timeout: float = DEFAULT_STOP_TIMEOUT,
start_stop_timeout: float = DEFAULT_START_STOP_TIMEOUT,
) -> None:
self._ctx = ctx
self._stop_timeout = stop_timeout
self._start_stop_timeout = start_stop_timeout

self._pending_tasks = {}

Expand All @@ -88,6 +88,7 @@ def __init__(
self._started = Event()
self._stopping = Event()
self._stopped = Event()
self._stopped.set()

def clear_task(self, task_id: str) -> bool:
if task_id in self._pending_tasks:
Expand Down Expand Up @@ -149,21 +150,32 @@ def mark_task_as_started(event: WorkerEvent, _: Optional[str]) -> None:
def start(self) -> None:
if self._started.is_set():
raise Exception("Worker is already running")
self._wait_until_stopped()
run_worker_in_own_thread(self)
self._wait_until_started()

def stop(self) -> None:
LOGGER.info("Attempting to stop worker")

# If the worker has not yet started there is nothing to do.
if self._started.is_set():
self._task_channel.put(KillSignal())
self._stopped.wait(timeout=self._stop_timeout)
# Event timeouts do not actually raise errors
if not self._stopped.is_set():
raise TimeoutError("Did not receive successful stop signal!")
else:
LOGGER.info("Stopping worker: nothing to do")
LOGGER.info("Stopped")
self._wait_until_stopped()

def _wait_until_started(self) -> None:
if not self._started.wait(timeout=self._start_stop_timeout):
raise TimeoutError(
f"Worker did not start within {self._start_stop_timeout} seconds"
)

def _wait_until_stopped(self) -> None:
if not self._stopped.wait(timeout=self._start_stop_timeout):
raise TimeoutError(
f"Worker did not stop within {self._start_stop_timeout} seconds"
)

@property
def state(self) -> WorkerState:
Expand All @@ -175,11 +187,13 @@ def run(self) -> None:
self._ctx.run_engine.subscribe(self._on_document)
self._ctx.run_engine.waiting_hook = self._waiting_hook

self._stopped.clear()
self._started.set()
while not self._stopping.is_set():
self._cycle_with_error_handling()
self._stopped.set()
self._started.clear()
self._stopping.clear()
self._stopped.set()

def pause(self, defer=False):
LOGGER.info("Requesting to pause the worker")
Expand Down
9 changes: 8 additions & 1 deletion tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def context(fake_device: FakeDevice) -> BlueskyContext:

@pytest.fixture
def inert_worker(context: BlueskyContext) -> Worker[Task]:
return RunEngineWorker(context, stop_timeout=2.0)
return RunEngineWorker(context, start_stop_timeout=2.0)


@pytest.fixture
Expand All @@ -94,6 +94,13 @@ def test_multi_stop(inert_worker: Worker) -> None:
inert_worker.stop()


def test_restart(inert_worker: Worker) -> None:
inert_worker.start()
inert_worker.stop()
inert_worker.start()
inert_worker.stop()


def test_multi_start(inert_worker: Worker) -> None:
inert_worker.start()
with pytest.raises(Exception):
Expand Down

0 comments on commit d0ffaaf

Please sign in to comment.