Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace interface singleton class with cached functions #572

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 80 additions & 109 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import logging
from collections.abc import Mapping
from functools import lru_cache
from typing import Any

from blueapi.config import ApplicationConfig
from blueapi.core.context import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.messaging.base import MessagingTemplate
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.model import (
DeviceModel,
PlanModel,
WorkerTask,
)
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.reworker import TaskWorker
from blueapi.worker.task import Task
Expand All @@ -20,61 +18,77 @@
context and worker"""


class InitialisationException(Exception):
pass
_CONFIG: ApplicationConfig = ApplicationConfig()


class _Singleton:
context: BlueskyContext
worker: Worker
messaging_template: MessagingTemplate | None = None
initialized = False
def config() -> ApplicationConfig:
return _CONFIG


def start_worker(
config: ApplicationConfig,
bluesky_context: BlueskyContext | None = None,
worker: TaskWorker | None = None,
) -> None:
"""Creates and starts a worker with supplied config"""
if _Singleton.initialized:
raise InitialisationException(
"Worker is already running. To reload call stop first"
)
if bluesky_context is None:
_Singleton.context = BlueskyContext()
_Singleton.context.with_config(config.env)
else:
_Singleton.context = bluesky_context
def set_config(new_config: ApplicationConfig):
global _CONFIG

if worker is None:
_Singleton.worker = TaskWorker(
_Singleton.context,
broadcast_statuses=config.env.events.broadcast_status_events,
)
else:
_Singleton.worker = worker
if config.stomp is not None:
_Singleton.messaging_template = StompMessagingTemplate.autoconfigured(
config.stomp
)
_CONFIG = new_config

# Start worker and setup events
_Singleton.worker.start()
if _Singleton.messaging_template is not None:
event_topic = _Singleton.messaging_template.destinations.topic(
"public.worker.event"
)

@lru_cache
def context() -> BlueskyContext:
ctx = BlueskyContext()
ctx.with_config(config().env)
return ctx


@lru_cache
def worker() -> Worker:
worker = TaskWorker(
context(),
broadcast_statuses=config().env.events.broadcast_status_events,
)
worker.start()
return worker


@lru_cache
def messaging_template() -> MessagingTemplate | None:
stomp_config = config().stomp
if stomp_config is not None:
template = StompMessagingTemplate.autoconfigured(stomp_config)

task_worker = worker()
event_topic = template.destinations.topic("public.worker.event")

_publish_event_streams(
{
_Singleton.worker.worker_events: event_topic,
_Singleton.worker.progress_events: event_topic,
_Singleton.worker.data_events: event_topic,
task_worker.worker_events: event_topic,
task_worker.progress_events: event_topic,
task_worker.data_events: event_topic,
}
)
_Singleton.messaging_template.connect()
_Singleton.initialized = True
template.connect()
return template
else:
return None


def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""

set_config(config)

# Eagerly initialize worker and messaging connection

logging.basicConfig(level=config.logging.level)
worker()
messaging_template()


def teardown() -> None:
worker().stop()
if (template := messaging_template()) is not None:
template.disconnect()
context.cache_clear()
worker.cache_clear()
messaging_template.cache_clear()


def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None:
Expand All @@ -84,130 +98,87 @@ def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -

def _publish_event_stream(stream: EventStream, destination: str) -> None:
def forward_message(event: Any, correlation_id: str | None) -> None:
if _Singleton.messaging_template is not None:
_Singleton.messaging_template.send(destination, event, None, correlation_id)
if (template := messaging_template()) is not None:
template.send(destination, event, None, correlation_id)

stream.subscribe(forward_message)


def stop_worker() -> None:
if not _Singleton.initialized:
raise InitialisationException(
"Cannot stop worker as it hasn't been started yet"
)
_Singleton.initialized = False
_Singleton.worker.stop()
if (
_Singleton.messaging_template is not None
and _Singleton.messaging_template.is_connected()
):
_Singleton.messaging_template.disconnect()


def get_plans() -> list[PlanModel]:
"""Get all available plans in the BlueskyContext"""
_ensure_worker_started()
return [PlanModel.from_plan(plan) for plan in _Singleton.context.plans.values()]
return [PlanModel.from_plan(plan) for plan in context().plans.values()]


def get_plan(name: str) -> PlanModel:
"""Get plan by name from the BlueskyContext"""
_ensure_worker_started()
return PlanModel.from_plan(_Singleton.context.plans[name])
return PlanModel.from_plan(context().plans[name])


def get_devices() -> list[DeviceModel]:
"""Get all available devices in the BlueskyContext"""
_ensure_worker_started()
return [
DeviceModel.from_device(device)
for device in _Singleton.context.devices.values()
]
return [DeviceModel.from_device(device) for device in context().devices.values()]


def get_device(name: str) -> DeviceModel:
"""Retrieve device by name from the BlueskyContext"""
_ensure_worker_started()
return DeviceModel.from_device(_Singleton.context.devices[name])
return DeviceModel.from_device(context().devices[name])


def submit_task(task: Task) -> str:
"""Submit a task to be run on begin_task"""
_ensure_worker_started()
return _Singleton.worker.submit_task(task)
return worker().submit_task(task)


def clear_task(task_id: str) -> str:
"""Remove a task from the worker"""
_ensure_worker_started()
return _Singleton.worker.clear_task(task_id)
return worker().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
_ensure_worker_started()
if task.task_id is not None:
_Singleton.worker.begin_task(task.task_id)
worker().begin_task(task.task_id)
return task


def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
"""Retrieve a list of tasks based on their status."""
_ensure_worker_started()
return _Singleton.worker.get_tasks_by_status(status)
return worker().get_tasks_by_status(status)


def get_active_task() -> TrackableTask | None:
"""Task the worker is currently running"""
_ensure_worker_started()
return _Singleton.worker.get_active_task()
return worker().get_active_task()


def get_worker_state() -> WorkerState:
"""State of the worker"""
_ensure_worker_started()
return _Singleton.worker.state
return worker().state


def pause_worker(defer: bool | None) -> None:
"""Command the worker to pause"""
_ensure_worker_started()
_Singleton.worker.pause(defer)
worker().pause(defer)


def resume_worker() -> None:
"""Command the worker to resume"""
_ensure_worker_started()
_Singleton.worker.resume()
worker().resume()


def cancel_active_task(failure: bool, reason: str | None) -> str:
"""Remove the currently active task from the worker if there is one
Returns the task_id of the active task"""
_ensure_worker_started()
return _Singleton.worker.cancel_active_task(failure, reason)
return worker().cancel_active_task(failure, reason)


def get_tasks() -> list[TrackableTask]:
"""Return a list of all tasks on the worker,
any one of which can be triggered with begin_task"""
_ensure_worker_started()
return _Singleton.worker.get_tasks()
return worker().get_tasks()


def get_task_by_id(task_id: str) -> TrackableTask | None:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""
_ensure_worker_started()
return _Singleton.worker.get_task_by_id(task_id)


def get_state() -> bool:
"""Initialization state"""
return _Singleton.initialized


def _ensure_worker_started() -> None:
if _Singleton.initialized:
return
raise InitialisationException("Worker must be stared before it is used")
return worker().get_task_by_id(task_id)
6 changes: 1 addition & 5 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ async def delete_environment(
) -> EnvironmentResponse:
"""Delete the current environment, causing internal components to be reloaded."""

def restart_runner(runner: WorkerDispatcher):
runner.stop()
runner.start()

if runner.state.initialized or runner.state.error_message is not None:
background_tasks.add_task(restart_runner, runner)
background_tasks.add_task(runner.reload)
return EnvironmentResponse(initialized=False)


Expand Down
Loading
Loading