Skip to content

Commit

Permalink
WIP Use lru_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
joeshannon committed Jul 24, 2024
1 parent a387311 commit ce694ff
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 211 deletions.
195 changes: 85 additions & 110 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import logging
from collections.abc import Mapping
from typing import Any
from functools import lru_cache
from typing import Any, Protocol, runtime_checkable

from blueapi.config import ApplicationConfig
from blueapi.core.context import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.messaging.base import MessagingTemplate
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.model import (
DeviceModel,
PlanModel,
WorkerTask,
)
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.reworker import TaskWorker
from blueapi.worker.task import Task
Expand All @@ -20,61 +18,81 @@
context and worker"""


class InitialisationException(Exception):
pass
@runtime_checkable
class CacheClearable(Protocol):
def cache_clear(self) -> None: ...


class _Singleton:
context: BlueskyContext
worker: Worker
messaging_template: MessagingTemplate | None = None
initialized = False
_CONFIG: ApplicationConfig = ApplicationConfig()


def start_worker(
config: ApplicationConfig,
bluesky_context: BlueskyContext | None = None,
worker: TaskWorker | None = None,
) -> None:
"""Creates and starts a worker with supplied config"""
if _Singleton.initialized:
raise InitialisationException(
"Worker is already running. To reload call stop first"
)
if bluesky_context is None:
_Singleton.context = BlueskyContext()
_Singleton.context.with_config(config.env)
else:
_Singleton.context = bluesky_context
def config() -> ApplicationConfig:
return _CONFIG

if worker is None:
_Singleton.worker = TaskWorker(
_Singleton.context,
broadcast_statuses=config.env.events.broadcast_status_events,
)
else:
_Singleton.worker = worker
if config.stomp is not None:
_Singleton.messaging_template = StompMessagingTemplate.autoconfigured(
config.stomp
)

# Start worker and setup events
_Singleton.worker.start()
if _Singleton.messaging_template is not None:
event_topic = _Singleton.messaging_template.destinations.topic(
"public.worker.event"
)
def set_config(new_config: ApplicationConfig):
global _CONFIG

_CONFIG = new_config


@lru_cache
def context() -> BlueskyContext:
ctx = BlueskyContext()
ctx.with_config(config().env)
return ctx


@lru_cache
def worker() -> Worker:
worker = TaskWorker(
context(),
broadcast_statuses=config().env.events.broadcast_status_events,
)
worker.start()
return worker


@lru_cache
def messaging_template() -> MessagingTemplate | None:
stomp_config = config().stomp
if stomp_config is not None:
template = StompMessagingTemplate.autoconfigured(stomp_config)

task_worker = worker()
event_topic = template.destinations.topic("public.worker.event")

_publish_event_streams(
{
_Singleton.worker.worker_events: event_topic,
_Singleton.worker.progress_events: event_topic,
_Singleton.worker.data_events: event_topic,
task_worker.worker_events: event_topic,
task_worker.progress_events: event_topic,
task_worker.data_events: event_topic,
}
)
_Singleton.messaging_template.connect()
_Singleton.initialized = True
template.connect()
return template
else:
return None


def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""

set_config(config)

# Eagerly initialize worker and messaging connection

logging.basicConfig(level=config.logging.level)
worker()
messaging_template()


def teardown() -> None:
worker().stop()
if (template := messaging_template()) is not None:
template.disconnect()
for component in [context, worker, messaging_template]:
component.cache_clear()


def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None:
Expand All @@ -84,130 +102,87 @@ def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -

def _publish_event_stream(stream: EventStream, destination: str) -> None:
def forward_message(event: Any, correlation_id: str | None) -> None:
if _Singleton.messaging_template is not None:
_Singleton.messaging_template.send(destination, event, None, correlation_id)
if (template := messaging_template()) is not None:
template.send(destination, event, None, correlation_id)

stream.subscribe(forward_message)


def stop_worker() -> None:
if not _Singleton.initialized:
raise InitialisationException(
"Cannot stop worker as it hasn't been started yet"
)
_Singleton.initialized = False
_Singleton.worker.stop()
if (
_Singleton.messaging_template is not None
and _Singleton.messaging_template.is_connected()
):
_Singleton.messaging_template.disconnect()


def get_plans() -> list[PlanModel]:
"""Get all available plans in the BlueskyContext"""
_ensure_worker_started()
return [PlanModel.from_plan(plan) for plan in _Singleton.context.plans.values()]
return [PlanModel.from_plan(plan) for plan in context().plans.values()]


def get_plan(name: str) -> PlanModel:
"""Get plan by name from the BlueskyContext"""
_ensure_worker_started()
return PlanModel.from_plan(_Singleton.context.plans[name])
return PlanModel.from_plan(context().plans[name])


def get_devices() -> list[DeviceModel]:
"""Get all available devices in the BlueskyContext"""
_ensure_worker_started()
return [
DeviceModel.from_device(device)
for device in _Singleton.context.devices.values()
]
return [DeviceModel.from_device(device) for device in context().devices.values()]


def get_device(name: str) -> DeviceModel:
"""Retrieve device by name from the BlueskyContext"""
_ensure_worker_started()
return DeviceModel.from_device(_Singleton.context.devices[name])
return DeviceModel.from_device(context().devices[name])


def submit_task(task: Task) -> str:
"""Submit a task to be run on begin_task"""
_ensure_worker_started()
return _Singleton.worker.submit_task(task)
return worker().submit_task(task)


def clear_task(task_id: str) -> str:
"""Remove a task from the worker"""
_ensure_worker_started()
return _Singleton.worker.clear_task(task_id)
return worker().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
_ensure_worker_started()
if task.task_id is not None:
_Singleton.worker.begin_task(task.task_id)
worker().begin_task(task.task_id)
return task


def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
"""Retrieve a list of tasks based on their status."""
_ensure_worker_started()
return _Singleton.worker.get_tasks_by_status(status)
return worker().get_tasks_by_status(status)


def get_active_task() -> TrackableTask | None:
"""Task the worker is currently running"""
_ensure_worker_started()
return _Singleton.worker.get_active_task()
return worker().get_active_task()


def get_worker_state() -> WorkerState:
"""State of the worker"""
_ensure_worker_started()
return _Singleton.worker.state
return worker().state


def pause_worker(defer: bool | None) -> None:
"""Command the worker to pause"""
_ensure_worker_started()
_Singleton.worker.pause(defer)
worker().pause(defer)


def resume_worker() -> None:
"""Command the worker to resume"""
_ensure_worker_started()
_Singleton.worker.resume()
worker().resume()


def cancel_active_task(failure: bool, reason: str | None) -> str:
"""Remove the currently active task from the worker if there is one
Returns the task_id of the active task"""
_ensure_worker_started()
return _Singleton.worker.cancel_active_task(failure, reason)
return worker().cancel_active_task(failure, reason)


def get_tasks() -> list[TrackableTask]:
"""Return a list of all tasks on the worker,
any one of which can be triggered with begin_task"""
_ensure_worker_started()
return _Singleton.worker.get_tasks()
return worker().get_tasks()


def get_task_by_id(task_id: str) -> TrackableTask | None:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""
_ensure_worker_started()
return _Singleton.worker.get_task_by_id(task_id)


def get_state() -> bool:
"""Initialization state"""
return _Singleton.initialized


def _ensure_worker_started() -> None:
if _Singleton.initialized:
return
raise InitialisationException("Worker must be stared before it is used")
return worker().get_task_by_id(task_id)
Loading

0 comments on commit ce694ff

Please sign in to comment.