Skip to content

Commit

Permalink
fix topic publishing (#195)
Browse files Browse the repository at this point in the history
copied code from before fastAPI change in service/app.py to re-establish the same subscriptions on the worker
  • Loading branch information
rosesyrett authored May 15, 2023
1 parent c57f705 commit 340de26
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from typing import Optional
from typing import Mapping, Optional

from blueapi.config import ApplicationConfig
from blueapi.core import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.messaging import StompMessagingTemplate
from blueapi.messaging.base import MessagingTemplate
from blueapi.worker.reworker import RunEngineWorker
Expand All @@ -29,19 +30,35 @@ def __init__(self, config: Optional[ApplicationConfig] = None) -> None:
def start(self) -> None:
self.worker.start()

self.worker.data_events.subscribe(
lambda event, corr_id: self.message_bus.send(
"public.worker.event.data", event, None, corr_id
)
)
self.worker.progress_events.subscribe(
lambda event, corr_id: self.message_bus.send(
"public.worker.event.progress", event, None, corr_id
)
self._publish_event_streams(
{
self.worker.worker_events: self.message_bus.destinations.topic(
"public.worker.event"
),
self.worker.progress_events: self.message_bus.destinations.topic(
"public.worker.event.progress"
),
self.worker.data_events: self.message_bus.destinations.topic(
"public.worker.event.data"
),
}
)

self.message_bus.connect()

def _publish_event_streams(
self, streams_to_destinations: Mapping[EventStream, str]
) -> None:
for stream, destination in streams_to_destinations.items():
self._publish_event_stream(stream, destination)

def _publish_event_stream(self, stream: EventStream, destination: str) -> None:
stream.subscribe(
lambda event, correlation_id: self.message_bus.send(
destination, event, None, correlation_id
)
)

def stop(self) -> None:
self.worker.stop()
self.message_bus.disconnect()
Expand Down

0 comments on commit 340de26

Please sign in to comment.