Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine events #224

Merged
merged 8 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/blueapi/cli/amq.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ def on_progress_event_wrapper(
self.app.destinations.topic("public.worker.event"), on_event_wrapper
)
self.app.subscribe(
self.app.destinations.topic("public.worker.event.progress"),
self.app.destinations.topic("public.worker.event"),
on_progress_event_wrapper,
)

# self.app.send("worker.run", {"name": name, "params": params})
task_response = self.app.send_and_receive(
"worker.run", {"name": name, "params": params}, reply_type=TaskResponse
Expand Down
4 changes: 2 additions & 2 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ def start(self) -> None:
"public.worker.event"
),
self.worker.progress_events: self.messaging_template.destinations.topic(
"public.worker.event.progress"
"public.worker.event"
),
self.worker.data_events: self.messaging_template.destinations.topic(
"public.worker.event.data"
"public.worker.event"
),
}
)
Expand Down
113 changes: 109 additions & 4 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import itertools
import threading
from concurrent.futures import Future
from typing import Callable, Iterable, List, Optional, TypeVar
from typing import Any, Callable, Iterable, List, Optional, TypeVar, Union

import pytest

from blueapi.config import EnvironmentConfig, Source, SourceKind
from blueapi.core import BlueskyContext, EventStream
from blueapi.core.bluesky_types import DataEvent
from blueapi.worker import (
ProgressEvent,
RunEngineWorker,
Expand Down Expand Up @@ -264,20 +265,86 @@ def begin_task_and_wait_until_complete(
#


def test_worker_and_data_events_produce_in_order(worker: Worker) -> None:
assert_running_count_plan_produces_ordered_worker_and_data_events(
[
WorkerEvent(
state=WorkerState.RUNNING,
task_status=TaskStatus(
task_id="count", task_complete=False, task_failed=False
),
errors=[],
warnings=[],
),
DataEvent(name="start", doc={}),
DataEvent(name="descriptor", doc={}),
DataEvent(name="event", doc={}),
DataEvent(name="stop", doc={}),
WorkerEvent(
state=WorkerState.IDLE,
task_status=TaskStatus(
task_id="count", task_complete=False, task_failed=False
),
errors=[],
warnings=[],
),
WorkerEvent(
state=WorkerState.IDLE,
task_status=TaskStatus(
task_id="count", task_complete=True, task_failed=False
),
errors=[],
warnings=[],
),
],
worker,
)


def assert_running_count_plan_produces_ordered_worker_and_data_events(
expected_events: List[Union[WorkerEvent, DataEvent]],
worker: Worker,
task: Task = RunPlan(name="count", params={"detectors": ["image_det"], "num": 1}),
timeout: float = 5.0,
) -> None:
event_streams: List[EventStream[Any, int]] = [
worker.data_events,
worker.worker_events,
]

count = itertools.count()
events: "Future[List[Any]]" = take_events_from_streams(
event_streams,
lambda _: next(count) >= len(expected_events) - 1,
)

task_id = worker.submit_task(task)
worker.begin_task(task_id)
results = events.result(timeout=timeout)

for actual, expected in itertools.zip_longest(results, expected_events):
if isinstance(expected, WorkerEvent):
if expected.task_status:
expected.task_status.task_id = task_id
assert actual == expected
elif isinstance(expected, DataEvent):
assert isinstance(actual, DataEvent)
assert actual.name == expected.name


E = TypeVar("E")
S = TypeVar("S")


def take_n_events(
stream: EventStream[E, S],
stream: EventStream[E, Any],
num: int,
) -> "Future[List[E]]":
count = itertools.count()
return take_events(stream, lambda _: next(count) >= num)


def take_events(
stream: EventStream[E, S],
stream: EventStream[E, Any],
cutoff_predicate: Callable[[E], bool],
) -> "Future[List[E]]":
events: List[E] = []
Expand All @@ -291,3 +358,41 @@ def on_event(event: E, event_id: Optional[str]) -> None:
sub = stream.subscribe(on_event)
future.add_done_callback(lambda _: stream.unsubscribe(sub))
return future


def take_events_from_streams(
streams: List[EventStream[Any, int]],
cutoff_predicate: Callable[[Any], bool],
) -> "Future[List[Any]]":
"""Returns a collated list of futures for events in numerous event streams.

The support for generic and algebraic types doesn't appear to extend to
taking an arbitrary list of concrete types with single but differing
generic arguments while also maintaining the generality of the argument
types.

The type for streams will be any combination of event streams each of a
given event type, where the event type is generic:

List[
Union[
EventStream[WorkerEvent, int],
EventStream[DataEvent, int],
EventStream[ProgressEvent, int]
]
]

"""
events: List[Any] = []
future: "Future[List[Any]]" = Future()

def on_event(event: Any, event_id: Optional[str]) -> None:
print(event)
events.append(event)
if cutoff_predicate(event):
future.set_result(events)

for stream in streams:
sub = stream.subscribe(on_event)
future.add_done_callback(lambda _: stream.unsubscribe(sub))
return future