Skip to content

Commit

Permalink
Track completed statuses (#111)
Browse files Browse the repository at this point in the history
For some devices there may be an additional status update broadcast
after the on_complete callback. This causes the status to be added
back to the dictionary where it will never be removed.

Track which statuses have been completed and check this set when
updating to ensure that a completed status is never added back to
the snapshot after on_complete is called.
  • Loading branch information
joeshannon committed May 16, 2023
1 parent aaccc3f commit c9d0584
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import partial
from queue import Full, Queue
from threading import Event, RLock
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union

from bluesky.protocols import Status

Expand Down Expand Up @@ -54,6 +54,7 @@ class RunEngineWorker(Worker[Task]):
_current: Optional[ActiveTask]
_status_lock: RLock
_status_snapshot: Dict[str, StatusView]
_completed_statuses: Set[str]
_worker_events: EventPublisher[WorkerEvent]
_progress_events: EventPublisher[ProgressEvent]
_data_events: EventPublisher[DataEvent]
Expand All @@ -79,6 +80,7 @@ def __init__(
self._data_events = EventPublisher()
self._status_lock = RLock()
self._status_snapshot = {}
self._completed_statuses = set()
self._started = Event()
self._stopping = Event()
self._stopped = Event()
Expand Down Expand Up @@ -152,6 +154,7 @@ def _cycle(self) -> None:
self._report_status()
self._errors.clear()
self._warnings.clear()
self._completed_statuses.clear()

@property
def worker_events(self) -> EventStream[WorkerEvent, int]:
Expand Down Expand Up @@ -240,6 +243,7 @@ def _monitor_status(self, status: Status) -> None:
def on_complete(status: Status) -> None:
self._on_status_event(status, status_uuid)
del self._status_snapshot[status_uuid]
self._completed_statuses.add(status_uuid)

status.add_callback(on_complete) # type: ignore

Expand Down Expand Up @@ -274,8 +278,10 @@ def _on_status_event(
time_elapsed=time_elapsed,
time_remaining=time_remaining,
)
self._status_snapshot[status_uuid] = view
self._publish_status_snapshot()
# Ensure completed statues are not re-added and published
if status_uuid not in self._completed_statuses:
self._status_snapshot[status_uuid] = view
self._publish_status_snapshot()

def _publish_status_snapshot(self) -> None:
if self._current is None:
Expand Down

0 comments on commit c9d0584

Please sign in to comment.