Skip to content

Commit

Permalink
Add document listener (#329)
Browse files Browse the repository at this point in the history
Chagnes:

* Add CLI command to listen for documents from the message bus and print
them to the console

---------

Co-authored-by: Rose Syrett <90774497+rosesyrett@users.noreply.github.com>
  • Loading branch information
callumforrester and rosesyrett authored Nov 10, 2023
1 parent 7994906 commit 8151ca1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
15 changes: 15 additions & 0 deletions helm/blueapi/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ spec:
env:
- name: SCRATCH_AREA
value: {{ .Values.scratch.containerPath }}
{{- if .Values.listener.enabled -}}
- name: {{ .Chart.Name }}-document-listener
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
resources:
{{- toYaml .Values.listener.resources | nindent 12 }}
args:
- "-c"
- "/config/config.yaml"
{{- with .Values.existingSecret }}
- "-c"
- "/config/secret.yaml"
{{- end }}
- "listen"
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
Expand Down
4 changes: 4 additions & 0 deletions helm/blueapi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ affinity: {}

hostNetwork: false # May be needed for talking to arcane protocols such as EPICS

listener:
enabled: true
resources: {}

scratch:
hostPath: "" # example: /usr/local/blueapi-software-scratch
containerPath: /blueapi-plugins/scratch
Expand Down
14 changes: 12 additions & 2 deletions src/blueapi/cli/amq.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def __init__(self, message: str) -> None:
super().__init__(message)


_Event = Union[WorkerEvent, ProgressEvent, DataEvent]


class AmqClient:
app: MessagingTemplate
complete: threading.Event
Expand Down Expand Up @@ -42,7 +45,8 @@ def subscribe_to_topics(
callback = BestEffortCallback()

def on_event_wrapper(
ctx: MessageContext, event: Union[WorkerEvent, ProgressEvent, DataEvent]
ctx: MessageContext,
event: _Event,
) -> None:
if isinstance(event, WorkerEvent):
if (on_event is not None) and (ctx.correlation_id == correlation_id):
Expand All @@ -55,9 +59,15 @@ def on_event_wrapper(
elif isinstance(event, DataEvent):
callback(event.name, event.doc)

self.subscribe_to_all_events(on_event_wrapper)

def subscribe_to_all_events(
self,
on_event: Callable[[MessageContext, _Event], None],
) -> None:
self.app.subscribe(
self.app.destinations.topic("public.worker.event"),
on_event_wrapper,
on_event,
)

def wait_for_complete(self, timeout: Optional[float] = None) -> None:
Expand Down
28 changes: 27 additions & 1 deletion src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from blueapi import __version__
from blueapi.cli.amq import AmqClient
from blueapi.config import ApplicationConfig, ConfigLoader
from blueapi.core import DataEvent
from blueapi.messaging import MessageContext
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.main import start
from blueapi.service.model import WorkerTask
Expand All @@ -21,7 +23,7 @@
print_schema_as_yaml,
write_schema_as_yaml,
)
from blueapi.worker import RunPlan, WorkerEvent, WorkerState
from blueapi.worker import ProgressEvent, RunPlan, WorkerEvent, WorkerState

from .rest import BlueapiRestClient

Expand Down Expand Up @@ -127,6 +129,30 @@ def get_devices(obj: dict) -> None:
pprint(client.get_devices().dict())


@controller.command(name="listen")
@check_connection
@click.pass_obj
def listen_to_events(obj: dict) -> None:
"""Listen to events output by blueapi"""
config: ApplicationConfig = obj["config"]
amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp))

def on_event(
context: MessageContext,
event: Union[WorkerEvent, ProgressEvent, DataEvent],
) -> None:
converted = json.dumps(event.dict(), indent=2)
print(converted)

print(
"Subscribing to all bluesky events from "
f"{config.stomp.host}:{config.stomp.port}"
)
with amq_client:
amq_client.subscribe_to_all_events(on_event)
input("Press enter to exit")


@controller.command(name="run")
@click.argument("name", type=str)
@click.argument("parameters", type=str, required=False)
Expand Down

0 comments on commit 8151ca1

Please sign in to comment.