From c9d0584bb62cfa571d759d45319734ee320c2ca5 Mon Sep 17 00:00:00 2001 From: Joe Shannon Date: Mon, 15 May 2023 17:12:09 +0100 Subject: [PATCH] Track completed statuses (#111) For some devices there may be an additional status update broadcast after the on_complete callback. This causes the status to be added back to the dictionary where it will never be removed. Track which statuses have been completed and check this set when updating to ensure that a completed status is never added back to the snapshot after on_complete is called. --- src/blueapi/worker/reworker.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index c61e015b0..37a8d4333 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -4,7 +4,7 @@ from functools import partial from queue import Full, Queue from threading import Event, RLock -from typing import Any, Dict, Iterable, List, Mapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union from bluesky.protocols import Status @@ -54,6 +54,7 @@ class RunEngineWorker(Worker[Task]): _current: Optional[ActiveTask] _status_lock: RLock _status_snapshot: Dict[str, StatusView] + _completed_statuses: Set[str] _worker_events: EventPublisher[WorkerEvent] _progress_events: EventPublisher[ProgressEvent] _data_events: EventPublisher[DataEvent] @@ -79,6 +80,7 @@ def __init__( self._data_events = EventPublisher() self._status_lock = RLock() self._status_snapshot = {} + self._completed_statuses = set() self._started = Event() self._stopping = Event() self._stopped = Event() @@ -152,6 +154,7 @@ def _cycle(self) -> None: self._report_status() self._errors.clear() self._warnings.clear() + self._completed_statuses.clear() @property def worker_events(self) -> EventStream[WorkerEvent, int]: @@ -240,6 +243,7 @@ def _monitor_status(self, status: Status) -> None: def on_complete(status: Status) -> None: self._on_status_event(status, status_uuid) del self._status_snapshot[status_uuid] + self._completed_statuses.add(status_uuid) status.add_callback(on_complete) # type: ignore @@ -274,8 +278,10 @@ def _on_status_event( time_elapsed=time_elapsed, time_remaining=time_remaining, ) - self._status_snapshot[status_uuid] = view - self._publish_status_snapshot() + # Ensure completed statues are not re-added and published + if status_uuid not in self._completed_statuses: + self._status_snapshot[status_uuid] = view + self._publish_status_snapshot() def _publish_status_snapshot(self) -> None: if self._current is None: