From aab9c0e686b2e25f0e86b48f16182e689d21f4ef Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Wed, 31 May 2023 10:19:48 +0100 Subject: [PATCH 1/6] Fix race condition on starting worker --- src/blueapi/worker/reworker.py | 3 +-- src/blueapi/worker/task.py | 45 ++++++++++++++++------------------ 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index fa23a48c8..51bb75b77 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -114,8 +114,7 @@ def begin_task(self, task_id: str) -> None: raise KeyError(f"No pending task with ID {task_id}") def submit_task(self, task: Task) -> str: - if isinstance(task, RunPlan): - task.set_clean_params(_lookup_params(self._ctx, task)) + task.prepare_params(self._ctx) task_id: str = str(uuid.uuid4()) trackable_task = TrackableTask(task_id=task_id, task=task) self._pending_tasks[task_id] = trackable_task diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index ed59fc485..51167e894 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -7,27 +7,10 @@ from blueapi.core import BlueskyContext from blueapi.utils import BlueapiBaseModel - -# TODO: Make a TaggedUnion -class Task(ABC, BlueapiBaseModel): - """ - Object that can run with a TaskContext - """ - - @abstractmethod - def do_task(self, __ctx: BlueskyContext) -> None: - """ - Perform the task using the context - - Args: - ctx: Context for the task, holds plans/device/etc - """ - - LOGGER = logging.getLogger(__name__) -class RunPlan(Task): +class Task(BlueapiBaseModel): """ Task that will run a plan """ @@ -36,21 +19,35 @@ class RunPlan(Task): params: Mapping[str, Any] = Field( description="Values for parameters to plan, if any", default_factory=dict ) - _sanitized_params: Optional[BaseModel] = Field(default=None) + _prepared_params: Optional[BaseModel] = None - def set_clean_params(self, model: BaseModel): - self._sanitized_params = model + def prepare_params(self, ctx: BlueskyContext) -> None: + self._ensure_params(ctx) def do_task(self, ctx: BlueskyContext) -> None: LOGGER.info(f"Asked to run plan {self.name} with {self.params}") func = ctx.plan_functions[self.name] - sanitized_params = self._sanitized_params or _lookup_params(ctx, self) - plan_generator = func(**sanitized_params.dict()) + prepared_params = self._ensure_params(ctx) + plan_generator = func(**prepared_params.dict()) ctx.run_engine(plan_generator) + def _ensure_params(self, ctx: BlueskyContext) -> BaseModel: + if self._prepared_params is None: + self._prepared_params = _lookup_params(ctx, self) + return self._prepared_params + + +class RunPlan(Task): + """ + Here for backward compatibility pending + https://github.com/DiamondLightSource/blueapi/issues/253 + """ + + ... + -def _lookup_params(ctx: BlueskyContext, task: RunPlan) -> BaseModel: +def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel: """ Checks plan parameters against context From 4876c9c4628a5dab63d472fd63ae3d22a6e2133d Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Wed, 31 May 2023 15:01:50 +0100 Subject: [PATCH 2/6] Begin refactoring RunEngineWorker, exposed race condition when stopping --- src/blueapi/worker/reworker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 51bb75b77..82d454d5e 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -1,4 +1,5 @@ import logging +import time import uuid from dataclasses import dataclass from functools import partial @@ -176,6 +177,9 @@ def _wait_until_stopped(self) -> None: f"Worker did not stop within {self._start_stop_timeout} seconds" ) + if not self._stopped.wait(timeout=self._stop_timeout): + raise TimeoutError("Did not receive successful stop signal!") + @property def state(self) -> WorkerState: return self._state From 61a6765d413afed101d631f9e4d52b87c5de81be Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Thu, 1 Jun 2023 07:42:24 +0100 Subject: [PATCH 3/6] Remove unused import --- src/blueapi/worker/reworker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 82d454d5e..9fe1f228b 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -1,5 +1,4 @@ import logging -import time import uuid from dataclasses import dataclass from functools import partial From 8db3899a691012fe0aa8802382d1069bb8186fd1 Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Thu, 1 Jun 2023 07:43:02 +0100 Subject: [PATCH 4/6] Fix all imports --- src/blueapi/worker/reworker.py | 2 +- src/blueapi/worker/task.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 9fe1f228b..a1031d1f6 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -25,7 +25,7 @@ WorkerState, ) from .multithread import run_worker_in_own_thread -from .task import RunPlan, Task, _lookup_params +from .task import Task from .worker import TrackableTask, Worker from .worker_busy_error import WorkerBusyError diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index 51167e894..9433a7e97 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -1,5 +1,4 @@ import logging -from abc import ABC, abstractmethod from typing import Any, Mapping, Optional from pydantic import BaseModel, Field From dd443a52855850b4d1ea28ed647c995086c11b3a Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Thu, 1 Jun 2023 07:44:12 +0100 Subject: [PATCH 5/6] Fix double timeout in waiting for stop signal --- src/blueapi/worker/reworker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index a1031d1f6..676d234d4 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -176,9 +176,6 @@ def _wait_until_stopped(self) -> None: f"Worker did not stop within {self._start_stop_timeout} seconds" ) - if not self._stopped.wait(timeout=self._stop_timeout): - raise TimeoutError("Did not receive successful stop signal!") - @property def state(self) -> WorkerState: return self._state From 824d70d1cd9362900b9d0f6a3ec2d4de2c488052 Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Thu, 1 Jun 2023 07:46:26 +0100 Subject: [PATCH 6/6] Fix docstring of RunPlan so it does not change schema --- src/blueapi/worker/task.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index 9433a7e97..d09add5c7 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -37,10 +37,11 @@ def _ensure_params(self, ctx: BlueskyContext) -> BaseModel: return self._prepared_params +# Here for backward compatibility pending +# https://github.com/DiamondLightSource/blueapi/issues/253 class RunPlan(Task): """ - Here for backward compatibility pending - https://github.com/DiamondLightSource/blueapi/issues/253 + Task that will run a plan """ ...