Skip to content

Commit

Permalink
simplification refactor - canceling the worker abstract class (#573)
Browse files Browse the repository at this point in the history
rationale in the issue
  • Loading branch information
stan-dot authored Aug 5, 2024
1 parent 8733c72 commit 6d0b50f
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 277 deletions.
5 changes: 2 additions & 3 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.reworker import TaskWorker
from blueapi.worker.task import Task
from blueapi.worker.worker import TrackableTask, Worker
from blueapi.worker.task_worker import TaskWorker, TrackableTask

"""This module provides interface between web application and underlying Bluesky
context and worker"""
Expand All @@ -39,7 +38,7 @@ def context() -> BlueskyContext:


@lru_cache
def worker() -> Worker:
def worker() -> TaskWorker:
worker = TaskWorker(
context(),
broadcast_statuses=config().env.events.broadcast_status_events,
Expand Down
6 changes: 3 additions & 3 deletions src/blueapi/service/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from blueapi.core import BLUESKY_PROTOCOLS, Device, Plan
from blueapi.utils import BlueapiBaseModel
from blueapi.worker import Worker, WorkerState
from blueapi.worker.worker import TrackableTask
from blueapi.worker import WorkerState
from blueapi.worker.task_worker import TaskWorker, TrackableTask

_UNKNOWN_NAME = "UNKNOWN"

Expand Down Expand Up @@ -114,7 +114,7 @@ class WorkerTask(BlueapiBaseModel):
)

@classmethod
def of_worker(cls, worker: Worker) -> "WorkerTask":
def of_worker(cls, worker: TaskWorker) -> "WorkerTask":
active = worker.get_active_task()
if active is not None:
return WorkerTask(task_id=active.task_id)
Expand Down
5 changes: 1 addition & 4 deletions src/blueapi/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from .event import ProgressEvent, StatusView, TaskStatus, WorkerEvent, WorkerState
from .multithread import run_worker_in_own_thread
from .reworker import TaskWorker
from .task import Task
from .worker import TrackableTask, Worker
from .task_worker import TaskWorker, TrackableTask
from .worker_errors import WorkerAlreadyStartedError, WorkerBusyError

__all__ = [
"run_worker_in_own_thread",
"TaskWorker",
"Task",
"Worker",
Expand Down
50 changes: 0 additions & 50 deletions src/blueapi/worker/multithread.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging
import uuid
from collections.abc import Iterable, Mapping
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from functools import partial
from queue import Full, Queue
from threading import Event, RLock
from typing import Any
from typing import Any, Generic, TypeVar

from bluesky.protocols import Status
from pydantic import Field
from super_state_machine.errors import TransitionError

from blueapi.core import (
Expand All @@ -17,6 +19,9 @@
EventStream,
WatchableStatus,
)
from blueapi.core.bluesky_event_loop import configure_bluesky_event_loop
from blueapi.utils.base_model import BlueapiBaseModel
from blueapi.utils.thread_exception import handle_all_exceptions

from .event import (
ProgressEvent,
Expand All @@ -27,17 +32,29 @@
WorkerEvent,
WorkerState,
)
from .multithread import run_worker_in_own_thread
from .task import Task
from .worker import TrackableTask, Worker
from .worker_errors import WorkerAlreadyStartedError, WorkerBusyError

LOGGER = logging.getLogger(__name__)

DEFAULT_START_STOP_TIMEOUT: float = 30.0

T = TypeVar("T")

class TaskWorker(Worker[Task]):

class TrackableTask(BlueapiBaseModel, Generic[T]):
"""
A representation of a task that the worker recognizes
"""

task_id: str
task: T
is_complete: bool = False
is_pending: bool = True
errors: list[str] = Field(default_factory=list)


class TaskWorker:
"""
Worker wrapping BlueskyContext that can work in its own thread/process
Expand Down Expand Up @@ -412,3 +429,41 @@ class KillSignal:
"""

...


def run_worker_in_own_thread(
worker: TaskWorker, executor: ThreadPoolExecutor | None = None
) -> Future:
"""
Helper function, make a worker run in a new thread managed by a ThreadPoolExecutor
Args:
worker (TaskWorker): The worker to run
executor (Optional[ThreadPoolExecutor], optional): The executor to manage the
thread, defaults to None in
which case a new one is
created
Returns:
Future: Future representing worker stopping
"""

if executor is None:
executor = ThreadPoolExecutor(1, "run-engine-worker")
return executor.submit(_run_worker_thread, worker)


@handle_all_exceptions
def _run_worker_thread(worker: TaskWorker) -> None:
"""
Helper function, run a worker forever, includes support for
printing exceptions to stdout from a non-main thread.
Args:
worker (TaskWorker): The worker to run
"""

LOGGER.info("Setting up event loop")
configure_bluesky_event_loop()
LOGGER.info("Worker starting")
worker.run()
186 changes: 0 additions & 186 deletions src/blueapi/worker/worker.py

This file was deleted.

Loading

0 comments on commit 6d0b50f

Please sign in to comment.