diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index c61e015b0..37a8d4333 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -4,7 +4,7 @@ from functools import partial from queue import Full, Queue from threading import Event, RLock -from typing import Any, Dict, Iterable, List, Mapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union from bluesky.protocols import Status @@ -54,6 +54,7 @@ class RunEngineWorker(Worker[Task]): _current: Optional[ActiveTask] _status_lock: RLock _status_snapshot: Dict[str, StatusView] + _completed_statuses: Set[str] _worker_events: EventPublisher[WorkerEvent] _progress_events: EventPublisher[ProgressEvent] _data_events: EventPublisher[DataEvent] @@ -79,6 +80,7 @@ def __init__( self._data_events = EventPublisher() self._status_lock = RLock() self._status_snapshot = {} + self._completed_statuses = set() self._started = Event() self._stopping = Event() self._stopped = Event() @@ -152,6 +154,7 @@ def _cycle(self) -> None: self._report_status() self._errors.clear() self._warnings.clear() + self._completed_statuses.clear() @property def worker_events(self) -> EventStream[WorkerEvent, int]: @@ -240,6 +243,7 @@ def _monitor_status(self, status: Status) -> None: def on_complete(status: Status) -> None: self._on_status_event(status, status_uuid) del self._status_snapshot[status_uuid] + self._completed_statuses.add(status_uuid) status.add_callback(on_complete) # type: ignore @@ -274,8 +278,10 @@ def _on_status_event( time_elapsed=time_elapsed, time_remaining=time_remaining, ) - self._status_snapshot[status_uuid] = view - self._publish_status_snapshot() + # Ensure completed statues are not re-added and published + if status_uuid not in self._completed_statuses: + self._status_snapshot[status_uuid] = view + self._publish_status_snapshot() def _publish_status_snapshot(self) -> None: if self._current is None: