From 9c1003edfa249fa29b55ef2f787078d6e7f2ce22 Mon Sep 17 00:00:00 2001 From: ahiuchingau <20424172+ahiuchingau@users.noreply.github.com> Date: Mon, 10 Jul 2023 17:00:29 -0400 Subject: [PATCH] rebase is going to be interesting --- .../move_execution/__init__.py | 11 + .../move_execution/move_group_runner.py | 169 ++++++ .../move_scheduler.py} | 501 +++++++----------- 3 files changed, 364 insertions(+), 317 deletions(-) create mode 100644 hardware/opentrons_hardware/hardware_control/move_execution/__init__.py create mode 100644 hardware/opentrons_hardware/hardware_control/move_execution/move_group_runner.py rename hardware/opentrons_hardware/hardware_control/{move_group_runner.py => move_execution/move_scheduler.py} (63%) diff --git a/hardware/opentrons_hardware/hardware_control/move_execution/__init__.py b/hardware/opentrons_hardware/hardware_control/move_execution/__init__.py new file mode 100644 index 000000000000..2aa684789faf --- /dev/null +++ b/hardware/opentrons_hardware/hardware_control/move_execution/__init__.py @@ -0,0 +1,11 @@ +"""Move Execution package.""" + +from .move_group_runner import MoveGroupRunner +from .move_scheduler import MoveScheduler, MoveDispatcher + + +__all__ = [ + "MoveGroupRunner", + "MoveScheduler", + "MoveDispatcher" +] diff --git a/hardware/opentrons_hardware/hardware_control/move_execution/move_group_runner.py b/hardware/opentrons_hardware/hardware_control/move_execution/move_group_runner.py new file mode 100644 index 000000000000..7fc92a373f25 --- /dev/null +++ b/hardware/opentrons_hardware/hardware_control/move_execution/move_group_runner.py @@ -0,0 +1,169 @@ +"""Class that executes motion on can bus.""" +import asyncio +from collections import defaultdict +import logging +from typing import List, Tuple + +from opentrons_hardware.firmware_bindings import ArbitrationId +from opentrons_hardware.firmware_bindings.constants import ( + NodeId, + ErrorCode, + MotorPositionFlags, +) +from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger +from opentrons_hardware.firmware_bindings.messages.message_definitions import ( + ClearAllMoveGroupsRequest, + TipActionResponse, +) +from opentrons_hardware.firmware_bindings.messages.payloads import EmptyPayload + +from opentrons_hardware.hardware_control.motion import MoveGroups +from .move_scheduler import ( + _Completions, + MoveDispatcher, + MoveScheduler, +) + +from opentrons_hardware.hardware_control.types import NodeDict + +log = logging.getLogger(__name__) + + +class MoveGroupRunner: + """A move command runner.""" + + def __init__( + self, + move_groups: MoveGroups, + start_at_index: int = 0, + ignore_stalls: bool = False, + ) -> None: + """Constructor. + + Args: + move_groups: The move groups to run. + start_at_index: The index the MoveGroupManager will start at + ignore_stalls: Depends on the disableStallDetection feature flag + """ + self._move_scheduler = MoveScheduler(move_groups, start_at_index, ignore_stalls) + self._is_prepped: bool = False + + async def prep(self, can_messenger: CanMessenger) -> None: + """Prepare the move group. The first thing that happens during run(). + + prep() and execute() can be used to replace a single call to run() to + ensure tighter timing, if you want something else to start as soon as + possible to the actual execution of the move. + """ + if not self._move_scheduler.has_moves(): + log.debug("No moves. Nothing to do.") + return + await self._clear_groups(can_messenger) + await self._move_scheduler.schedule_groups(can_messenger) + self._is_prepped = True + + async def execute( + self, can_messenger: CanMessenger + ) -> NodeDict[Tuple[float, float, bool, bool]]: + """Execute a pre-prepared move group. The second thing that run() does. + + prep() and execute() can be used to replace a single call to run() to + ensure tighter timing, if you want something else to start as soon as + possible to the actual execution of the move. + """ + if not self._move_scheduler.has_moves(): + log.debug("No moves. Nothing to do.") + return {} + if not self._is_prepped: + raise RuntimeError("A group must be prepped before it can be executed.") + try: + move_completion_data = await self._move(can_messenger) + except (RuntimeError, asyncio.TimeoutError): + log.error("raising error from Move group runner") + raise + return self._accumulate_move_completions(move_completion_data) + + async def run( + self, can_messenger: CanMessenger + ) -> NodeDict[Tuple[float, float, bool, bool]]: + """Run the move group. + + Args: + can_messenger: a can messenger + + Returns: + The current position after the move for all the axes that + acknowledged completing moves. + + This function first prepares all connected devices to move (by sending + all the data for the moves over) and then executes the move with a + single call. + + prep() and execute() can be used to replace a single call to run() to + ensure tighter timing, if you want something else to start as soon as + possible to the actual execution of the move. + """ + await self.prep(can_messenger) + return await self.execute(can_messenger) + + @staticmethod + def _accumulate_move_completions( + completions: _Completions, + ) -> NodeDict[Tuple[float, float, bool, bool]]: + position: NodeDict[ + List[Tuple[Tuple[int, int], float, float, bool, bool]] + ] = defaultdict(list) + for arbid, completion in completions: + if isinstance(completion, TipActionResponse): + continue + position[NodeId(arbid.parts.originating_node_id)].append( + ( + ( + completion.payload.group_id.value, + completion.payload.seq_id.value, + ), + float(completion.payload.current_position_um.value) / 1000.0, + float(completion.payload.encoder_position_um.value) / 1000.0, + bool( + completion.payload.position_flags.value + & MotorPositionFlags.stepper_position_ok.value + ), + bool( + completion.payload.position_flags.value + & MotorPositionFlags.encoder_position_ok.value + ), + ) + ) + # for each node, pull the position from the completion with the largest + # combination of group id and sequence id + return { + node: next( + reversed( + sorted(poslist, key=lambda position_element: position_element[0]) + ) + )[1:] + for node, poslist in position.items() + } + + async def _clear_groups(self, can_messenger: CanMessenger) -> None: + """Send commands to clear the message groups. + + Args: + can_messenger: a can messenger + """ + error = await can_messenger.ensure_send( + node_id=NodeId.broadcast, + message=ClearAllMoveGroupsRequest(payload=EmptyPayload()), + ) + if error != ErrorCode.ok: + log.warning("Clear move group failed") + + async def _move(self, can_messenger: CanMessenger) -> _Completions: + """Run all the move groups.""" + dispatcher: MoveDispatcher = self._move_scheduler.build_distpatcher() + try: + can_messenger.add_listener(dispatcher) + completions = await dispatcher.run(can_messenger) + finally: + can_messenger.remove_listener(dispatcher) + return completions diff --git a/hardware/opentrons_hardware/hardware_control/move_group_runner.py b/hardware/opentrons_hardware/hardware_control/move_execution/move_scheduler.py similarity index 63% rename from hardware/opentrons_hardware/hardware_control/move_group_runner.py rename to hardware/opentrons_hardware/hardware_control/move_execution/move_scheduler.py index dcca68d691cc..85b2617ccb45 100644 --- a/hardware/opentrons_hardware/hardware_control/move_group_runner.py +++ b/hardware/opentrons_hardware/hardware_control/move_execution/move_scheduler.py @@ -1,9 +1,8 @@ -"""Class that schedules motion on can bus.""" +"""Class that schedule motion on can bus.""" import asyncio -from collections import defaultdict from dataclasses import dataclass, field import logging -from typing import List, Set, Tuple, Iterator, Union, Optional, Dict +from typing import List, Set, Tuple, Iterator, Union, Optional import numpy as np import time @@ -11,7 +10,6 @@ from opentrons_hardware.firmware_bindings.constants import ( NodeId, ErrorCode, - MotorPositionFlags, ErrorSeverity, GearMotorId, MoveAckId, @@ -19,7 +17,6 @@ from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger from opentrons_hardware.firmware_bindings.messages import MessageDefinition from opentrons_hardware.firmware_bindings.messages.message_definitions import ( - ClearAllMoveGroupsRequest, AddLinearMoveRequest, MoveCompleted, ExecuteMoveGroupRequest, @@ -40,13 +37,12 @@ TipActionRequestPayload, EmptyPayload, ) -from .constants import ( +from opentrons_hardware.hardware_control.constants import ( INTERRUPTS_PER_SEC, TIP_INTERRUPTS_PER_SEC, BRUSHED_MOTOR_INTERRUPTS_PER_SEC, ) from opentrons_hardware.hardware_control.motion import ( - MoveGroup, MoveGroups, MoveGroupSingleAxisStep, MoveGroupSingleGripperStep, @@ -62,21 +58,22 @@ from opentrons_hardware.firmware_bindings.messages.fields import ( PipetteTipActionTypeField, MoveStopConditionField, + MoveAckIdField, ) from opentrons_hardware.hardware_control.motion import MoveStopCondition from opentrons_hardware.hardware_control.motion_planning.move_utils import ( MoveConditionNotMet, ) -from .types import NodeDict +from opentrons_hardware.hardware_control.types import NodeDict log = logging.getLogger(__name__) +_SchedulableRequests = Union[AddLinearMoveRequest, HomeRequest, GripperGripRequest, GripperHomeRequest, AddBrushedLinearMoveRequest, TipActionRequest] _AcceptableMoves = Union[MoveCompleted, TipActionResponse] _CompletionPacket = Tuple[ArbitrationId, _AcceptableMoves] _Completions = List[_CompletionPacket] - @dataclass(order=True) class ScheduledMove: @@ -86,300 +83,26 @@ class ScheduledMove: duration: float tip_action_motors: List[GearMotorId] = field(default_factory=list) - def is_done(self): + def is_done(self) -> bool: return True if not self.tip_action_motors else False - def reject_ack(self, ack: MoveAckId): - if ack not in MoveStopCondition.acceptable_ack(self.stop_condition): + def reject_ack(self, ack: UInt8Field) -> bool: + if MoveAckId not in MoveStopCondition.acceptable_ack(self.stop_condition): log.error(f"Move failed: condition {self.stop_condition} not met") return True return False - - _MoveGroupInfo = Set[ScheduledMove] -class MoveGroupRunner: - """A move command scheduler.""" - - def __init__( - self, - move_groups: MoveGroups, - start_at_index: int = 0, - ignore_stalls: bool = False, - ) -> None: - """Constructor. - - Args: - move_groups: The move groups to run. - start_at_index: The index the MoveGroupManager will start at - ignore_stalls: Depends on the disableStallDetection feature flag - """ - self._move_groups = move_groups - self._start_at_index = start_at_index - self._ignore_stalls = ignore_stalls - self._is_prepped: bool = False - - @staticmethod - def _has_moves(move_groups: MoveGroups, start_index: int = 0) -> bool: - for move_group in move_groups[start_index:]: - for move in move_group: - for node, step in move.items(): - return True - return False - - async def prep(self, can_messenger: CanMessenger) -> None: - """Prepare the move group. The first thing that happens during run(). - - prep() and execute() can be used to replace a single call to run() to - ensure tighter timing, if you want something else to start as soon as - possible to the actual execution of the move. - """ - if not self._has_moves(self._move_groups, self._start_at_index): - log.debug("No moves. Nothing to do.") - return - await self._clear_groups(can_messenger) - await self._send_groups(can_messenger) - self._is_prepped = True - - async def execute( - self, can_messenger: CanMessenger - ) -> NodeDict[Tuple[float, float, bool, bool]]: - """Execute a pre-prepared move group. The second thing that run() does. - - prep() and execute() can be used to replace a single call to run() to - ensure tighter timing, if you want something else to start as soon as - possible to the actual execution of the move. - """ - if not self._has_moves(self._move_groups, self._start_at_index): - log.debug("No moves. Nothing to do.") - return {} - if not self._is_prepped: - raise RuntimeError("A group must be prepped before it can be executed.") - try: - move_completion_data = await self._move(can_messenger, self._start_at_index) - except (RuntimeError, asyncio.TimeoutError): - log.error("raising error from Move group runner") - raise - return self._accumulate_move_completions(move_completion_data) - - async def run( - self, can_messenger: CanMessenger - ) -> NodeDict[Tuple[float, float, bool, bool]]: - """Run the move group. - - Args: - can_messenger: a can messenger - - Returns: - The current position after the move for all the axes that - acknowledged completing moves. - - This function first prepares all connected devices to move (by sending - all the data for the moves over) and then executes the move with a - single call. - - prep() and execute() can be used to replace a single call to run() to - ensure tighter timing, if you want something else to start as soon as - possible to the actual execution of the move. - """ - await self.prep(can_messenger) - return await self.execute(can_messenger) - - @staticmethod - def _accumulate_move_completions( - completions: _Completions, - ) -> NodeDict[Tuple[float, float, bool, bool]]: - position: NodeDict[ - List[Tuple[Tuple[int, int], float, float, bool, bool]] - ] = defaultdict(list) - for arbid, completion in completions: - if isinstance(completion, TipActionResponse): - continue - position[NodeId(arbid.parts.originating_node_id)].append( - ( - ( - completion.payload.group_id.value, - completion.payload.seq_id.value, - ), - float(completion.payload.current_position_um.value) / 1000.0, - float(completion.payload.encoder_position_um.value) / 1000.0, - bool( - completion.payload.position_flags.value - & MotorPositionFlags.stepper_position_ok.value - ), - bool( - completion.payload.position_flags.value - & MotorPositionFlags.encoder_position_ok.value - ), - ) - ) - # for each node, pull the position from the completion with the largest - # combination of group id and sequence id - return { - node: next( - reversed( - sorted(poslist, key=lambda position_element: position_element[0]) - ) - )[1:] - for node, poslist in position.items() - } - - async def _clear_groups(self, can_messenger: CanMessenger) -> None: - """Send commands to clear the message groups. - - Args: - can_messenger: a can messenger - """ - error = await can_messenger.ensure_send( - node_id=NodeId.broadcast, - message=ClearAllMoveGroupsRequest(payload=EmptyPayload()), - ) - if error != ErrorCode.ok: - log.warning("Clear move group failed") - - async def _send_groups(self, can_messenger: CanMessenger) -> None: - """Send commands to set up the message groups.""" - for group_i, group in enumerate(self._move_groups): - for seq_i, sequence in enumerate(group): - for node, step in sequence.items(): - await can_messenger.send( - node_id=node, - message=self.get_message_type( - step, group_i + self._start_at_index, seq_i, self._ignore_stalls - ), - ) - - @classmethod - def convert_velocity( - cls, velocity: Union[float, np.float64], interrupts: int - ) -> Int32Field: - return Int32Field(int((velocity / interrupts) * (2**31))) +class MoveDispatcher: + def __init__(self, scheduled_moves: List[_MoveGroupInfo], move_durations: List[float], start_index: int = 0): + self._moves = scheduled_moves + self._durations = move_durations + self._start_index = start_index - @classmethod - def convert_acceleration( - cls, acceleration: Union[float, np.float64], interrupts: int - ) -> Int32Field: - return Int32Field( - int((acceleration * 1000.0 / (interrupts ** 2)) * (2**31)) - ) - - @classmethod - def get_message_type( - cls, step: SingleMoveStep, group: int, seq: int, ignore_stalls: bool = False, - ) -> MessageDefinition: - """Return the correct payload type.""" - if isinstance(step, MoveGroupSingleAxisStep): - return cls.get_stepper_motor_message(step, group, seq, ignore_stalls) - elif isinstance(step, MoveGroupTipActionStep): - return cls.get_tip_action_motor_message(step, group, seq) - else: - return cls.get_brushed_motor_message(step, group, seq) - - @classmethod - def get_brushed_motor_message( - cls, step: MoveGroupSingleGripperStep, group: int, seq: int - ) -> MessageDefinition: - payload = GripperMoveRequestPayload( - group_id=UInt8Field(group), - seq_id=UInt8Field(seq), - duration=UInt32Field( - int(step.duration_sec * BRUSHED_MOTOR_INTERRUPTS_PER_SEC) - ), - duty_cycle=UInt32Field(int(step.pwm_duty_cycle)), - encoder_position_um=Int32Field(int(step.encoder_position_um)), - ) - if step.move_type == MoveType.home: - return GripperHomeRequest(payload=payload) - elif step.move_type == MoveType.grip: - return GripperGripRequest(payload=payload) - else: - return AddBrushedLinearMoveRequest(payload=payload) - - @classmethod - def get_stepper_motor_message( - cls, step: MoveGroupSingleAxisStep, group: int, seq: int, ignore_stalls: bool = False, - ) -> MessageDefinition: - if step.move_type == MoveType.home: - home_payload = HomeRequestPayload( - group_id=UInt8Field(group), - seq_id=UInt8Field(seq), - duration=UInt32Field(int(step.duration_sec * INTERRUPTS_PER_SEC)), - velocity_mm=cls.convert_velocity( - step.velocity_mm_sec, INTERRUPTS_PER_SEC - ), - ) - return HomeRequest(payload=home_payload) - else: - stop_cond = step.stop_condition.value - if ignore_stalls: - stop_cond += MoveStopCondition.ignore_stalls.value - linear_payload = AddLinearMoveRequestPayload( - request_stop_condition=MoveStopConditionField(stop_cond), - group_id=UInt8Field(group), - seq_id=UInt8Field(seq), - duration=UInt32Field(int(step.duration_sec * INTERRUPTS_PER_SEC)), - acceleration_um=cls.convert_acceleration( - step.acceleration_mm_sec_sq, INTERRUPTS_PER_SEC - ), - velocity_mm=cls.convert_velocity( - step.velocity_mm_sec, INTERRUPTS_PER_SEC - ), - ) - return AddLinearMoveRequest(payload=linear_payload) - - @classmethod - def get_tip_action_motor_message( - cls, step: MoveGroupTipActionStep, group: int, seq: int - ) -> TipActionRequest: - tip_action_payload = TipActionRequestPayload( - group_id=UInt8Field(group), - seq_id=UInt8Field(seq), - duration=UInt32Field(int(step.duration_sec * TIP_INTERRUPTS_PER_SEC)), - velocity=cls.convert_velocity( - step.velocity_mm_sec, TIP_INTERRUPTS_PER_SEC - ), - action=PipetteTipActionTypeField(step.action), - request_stop_condition=MoveStopConditionField(step.stop_condition), - ) - return TipActionRequest(payload=tip_action_payload) - - async def _move( - self, can_messenger: CanMessenger, start_at_index: int - ) -> _Completions: - """Run all the move groups.""" - scheduler = MoveScheduler(self._move_groups, start_at_index) - try: - can_messenger.add_listener(scheduler) - completions = await scheduler.run(can_messenger) - finally: - can_messenger.remove_listener(scheduler) - return completions - - -class MoveScheduler: - """A message listener that manages the sending of execute move group messages.""" - - def __init__(self, move_groups: MoveGroups, start_at_index: int = 0) -> None: - """Constructor.""" - # For each move group create a set identifying the node and seq id and stop condition. - self._moves: List[_MoveGroupInfo] = [] - self._durations: List[float] = [] - for group_id, move_group in enumerate(move_groups): - if group_id < start_at_index: - # we don't care about the moves before the start index, - # just need to preserve the index - self._moves.append([]) - self._durations.append(0.0) - else: - group_info = self._get_group_info(move_group) - self._moves.append(group_info) - self._durations.append(sum(step.duration for step in group_info)) - - self._start_index = start_at_index - log.debug(f"Move scheduler running for groups {move_groups}") + log.debug(f"Move scheduler running for groups {scheduled_moves}") self._completion_queue: asyncio.Queue[_CompletionPacket] = asyncio.Queue() self._event = asyncio.Event() self._error: Optional[ErrorMessage] = None @@ -388,22 +111,6 @@ def __init__(self, move_groups: MoveGroups, start_at_index: int = 0) -> None: self._current_nodes: List[NodeId] = [] self._should_stop = False - @staticmethod - def _get_group_info(move_group: MoveGroup) -> _MoveGroupInfo: - group_info: _MoveGroupInfo = [] - for seq_id, node_move in enumerate(move_group): - for node, move in node_move.items(): - node_seq = ScheduledMove( - seq_id=seq_id, - node_id=node.value, - stop_condition=move.stop_condition, - duration=float(move.duration_sec), - tip_action_motors=([ - GearMotorId.left, GearMotorId.right] if isinstance(move, MoveGroupTipActionStep) else [])) - group_info.append(node_seq) - return group_info - - def _handle_error( self, message: ErrorMessage, arbitration_id: ArbitrationId ) -> None: @@ -419,8 +126,8 @@ def _handle_move_responses( ) -> None: group_id = message.payload.group_id.value seq_id = message.payload.seq_id.value - ack_id = message.payload.ack_id.value node_id = arbitration_id.parts.originating_node_id + ack_id = message.payload.ack_id.value try: matched = next(filter(lambda m: m.seq_id == seq_id and m.node_id == node_id, self._moves[group_id]), None) @@ -494,16 +201,17 @@ async def _send_stop_if_necessary(self, can_messenger: CanMessenger) -> None: log.warning(f"Recoverable firmware error during {self._current_group_id}: {self._error}") @classmethod - def get_expected_nodes(cls, move_group: MoveGroup) -> Set[NodeId]: + def get_expected_nodes(cls, move_group: _MoveGroupInfo) -> Set[NodeId]: """Update the active nodes of the current move group.""" - return set(m.node_id for m in move_group) + return set(list(m.node_id for m in move_group)) async def run(self, can_messenger: CanMessenger) -> _Completions: """Start each move group after the prior has completed.""" - for group_id, move_group in enumerate(self._moves[self._start_index:], self._start_index): + for group_id, move_group in enumerate(self._schedule_moves[self._start_index:]): self._current_group_id = group_id self._current_nodes = list(self.get_expected_nodes(move_group)) + group_duration = self._scheduled_durations[group_id] log.debug(f"Executing move group {group_id}.") error = await can_messenger.ensure_send( @@ -523,10 +231,10 @@ async def run(self, can_messenger: CanMessenger) -> _Completions: log.error(f"recieved error trying to execute move group {str(error)}") expected_time = max( - 1.0, self._durations[group_id] * 1.1 + 1.0, group_duration * 1.1 ) full_timeout = max( - 1.0, self._durations[group_id] * 2 + 1.0, group_duration * 2 ) start_time = time.time() @@ -540,12 +248,12 @@ async def run(self, can_messenger: CanMessenger) -> _Completions: self._event.wait(), full_timeout, ) - duration = time.time() - start_time + time_elasped = time.time() - start_time await self._send_stop_if_necessary(can_messenger) - if duration >= expected_time: + if time_elasped >= expected_time: log.warning( - f"Move set {str(group_id)} took longer ({duration} seconds) than expected ({expected_time} seconds)." + f"Move set {str(group_id)} took longer ({time_elasped} seconds) than expected ({expected_time} seconds)." ) except asyncio.TimeoutError: log.warning( @@ -564,3 +272,162 @@ def _reify_queue_iter() -> Iterator[_CompletionPacket]: yield self._completion_queue.get_nowait() return list(_reify_queue_iter()) + + +class MoveScheduler: + def __init__(self, move_groups: MoveGroups, start_at_index: int = 0, ignore_stalls: bool = False): + self._move_groups = move_groups + self._start_at_index = start_at_index + self._scheduled: List[_MoveGroupInfo] = [[] for _ in move_groups] + self._group_durations: List[float] = [0.0 for _ in move_groups] + self._ignore_stalls = ignore_stalls + self._ready_for_dispatch = False + + @property + def ready_for_dispatch(self): + return self._ready_for_dispatch + + def has_moves(self) -> bool: + for move_group in self._move_groups[self._start_at_index:]: + for move in move_group: + for node, step in move.items(): + return True + return False + + async def schedule_groups(self, can_messenger: CanMessenger) -> None: + """Send commands to set up the message groups.""" + + for group_i, group in enumerate(self._move_groups): + if group_i < self._start_at_index: + # we don't care about the moves before the start index, + # just need to preserve the index + continue + + for seq_i, sequence in enumerate(group): + for node, step in sequence.items(): + message = self.get_message_type( + step, group_i + self._start_at_index, seq_i, self._ignore_stalls + ) + await can_messenger.send(node_id=node, message=message) + self._add_schedule(group_i, node, message) + self._add_sequence_duration(group_i, sequence.values[0].duration_sec) + self._ready_for_dispatch = True + + def _add_schedule(self, group_id: int, node: NodeId, message: _SchedulableRequests): + self._scheduled[group_id].append( + self._get_schedule_move(message, node) + ) + + def _add_sequence_duration(self, group_id: int, seq_duration: float): + self._group_durations[group_id] += seq_duration + + @staticmethod + def _get_schedule_move(message: _SchedulableRequests, node: NodeId) -> ScheduledMove: + return ScheduledMove( + seq_id=message.seq_id, + node_id=node.value, + stop_condition=message.payload.stop_condition, + duration=float(message.payload.duration), + tip_action_motors=([ + GearMotorId.left, GearMotorId.right] if isinstance(message, MoveGroupTipActionStep) else []) + ) + + + @classmethod + def convert_velocity( + cls, velocity: Union[float, np.float64], interrupts: int + ) -> Int32Field: + return Int32Field(int((velocity / interrupts) * (2**31))) + + @classmethod + def convert_acceleration( + cls, acceleration: Union[float, np.float64], interrupts: int + ) -> Int32Field: + return Int32Field( + int((acceleration * 1000.0 / (interrupts ** 2)) * (2**31)) + ) + + @classmethod + def get_message_type( + cls, step: SingleMoveStep, group: int, seq: int, ignore_stalls: bool = False, + ) -> MessageDefinition: + """Return the correct payload type.""" + if isinstance(step, MoveGroupSingleAxisStep): + return cls.get_stepper_motor_message(step, group, seq, ignore_stalls) + elif isinstance(step, MoveGroupTipActionStep): + return cls.get_tip_action_motor_message(step, group, seq) + else: + return cls.get_brushed_motor_message(step, group, seq) + + @classmethod + def get_brushed_motor_message( + cls, step: MoveGroupSingleGripperStep, group: int, seq: int + ) -> MessageDefinition: + payload = GripperMoveRequestPayload( + group_id=UInt8Field(group), + seq_id=UInt8Field(seq), + duration=UInt32Field( + int(step.duration_sec * BRUSHED_MOTOR_INTERRUPTS_PER_SEC) + ), + duty_cycle=UInt32Field(int(step.pwm_duty_cycle)), + encoder_position_um=Int32Field(int(step.encoder_position_um)), + ) + if step.move_type == MoveType.home: + return GripperHomeRequest(payload=payload) + elif step.move_type == MoveType.grip: + return GripperGripRequest(payload=payload) + else: + return AddBrushedLinearMoveRequest(payload=payload) + + @classmethod + def get_stepper_motor_message( + cls, step: MoveGroupSingleAxisStep, group: int, seq: int, ignore_stalls: bool = False, + ) -> MessageDefinition: + if step.move_type == MoveType.home: + home_payload = HomeRequestPayload( + group_id=UInt8Field(group), + seq_id=UInt8Field(seq), + duration=UInt32Field(int(step.duration_sec * INTERRUPTS_PER_SEC)), + velocity_mm=cls.convert_velocity( + step.velocity_mm_sec, INTERRUPTS_PER_SEC + ), + ) + return HomeRequest(payload=home_payload) + else: + stop_cond = step.stop_condition.value + if ignore_stalls: + stop_cond += MoveStopCondition.ignore_stalls.value + linear_payload = AddLinearMoveRequestPayload( + request_stop_condition=MoveStopConditionField(stop_cond), + group_id=UInt8Field(group), + seq_id=UInt8Field(seq), + duration=UInt32Field(int(step.duration_sec * INTERRUPTS_PER_SEC)), + acceleration_um=cls.convert_acceleration( + step.acceleration_mm_sec_sq, INTERRUPTS_PER_SEC + ), + velocity_mm=cls.convert_velocity( + step.velocity_mm_sec, INTERRUPTS_PER_SEC + ), + ) + return AddLinearMoveRequest(payload=linear_payload) + + @classmethod + def get_tip_action_motor_message( + cls, step: MoveGroupTipActionStep, group: int, seq: int + ) -> TipActionRequest: + tip_action_payload = TipActionRequestPayload( + group_id=UInt8Field(group), + seq_id=UInt8Field(seq), + duration=UInt32Field(int(step.duration_sec * TIP_INTERRUPTS_PER_SEC)), + velocity=cls.convert_velocity( + step.velocity_mm_sec, TIP_INTERRUPTS_PER_SEC + ), + action=PipetteTipActionTypeField(step.action), + request_stop_condition=MoveStopConditionField(step.stop_condition), + ) + return TipActionRequest(payload=tip_action_payload) + + def build_distpatcher(self) -> MoveDispatcher: + if not self._ready_for_dispatch: + raise RuntimeError("Move groups must be scheduled before dispatcher") + return MoveDispatcher(self._move_groups, self._group_durations, self._start_at_index)