-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(api): add deck conflict checks for pipetting with partial tip configuration #14066
Changes from all commits
9220919
5537a48
4fcdf9b
b28f5fa
de1d5a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,51 @@ | ||
"""A Protocol-Engine-friendly wrapper for opentrons.motion_planning.deck_conflict.""" | ||
|
||
import itertools | ||
from typing import Collection, Dict, Optional, Tuple, overload | ||
import logging | ||
from typing import Collection, Dict, Optional, Tuple, overload, Union | ||
|
||
from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError | ||
|
||
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType | ||
from opentrons.hardware_control.modules.types import ModuleType | ||
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict | ||
from opentrons.motion_planning.adjacent_slots_getters import ( | ||
get_north_slot, | ||
get_west_slot, | ||
) | ||
from opentrons.protocol_engine import ( | ||
StateView, | ||
DeckSlotLocation, | ||
ModuleLocation, | ||
OnLabwareLocation, | ||
AddressableAreaLocation, | ||
OFF_DECK_LOCATION, | ||
WellLocation, | ||
DropTipWellLocation, | ||
) | ||
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError | ||
from opentrons.types import DeckSlotName | ||
from opentrons.types import DeckSlotName, Point | ||
|
||
|
||
class PartialTipMovementNotAllowedError(MotionPlanningFailureError): | ||
"""Error raised when trying to perform a partial tip movement to an illegal location.""" | ||
|
||
def __init__(self, message: str) -> None: | ||
super().__init__( | ||
message=message, | ||
) | ||
|
||
|
||
_log = logging.getLogger(__name__) | ||
|
||
# TODO (spp, 2023-12-06): move this to a location like motion planning where we can | ||
# derive these values from geometry definitions | ||
# Bounding box measurements | ||
A12_column_front_left_bound = Point(x=-1.8, y=2) | ||
A12_column_back_right_bound = Point(x=592, y=506.2) | ||
|
||
# Arbitrary safety margin in z-direction | ||
Z_SAFETY_MARGIN = 10 | ||
|
||
|
||
@overload | ||
|
@@ -106,6 +137,201 @@ def check( | |
) | ||
|
||
|
||
def check_safe_for_pipette_movement( | ||
engine_state: StateView, | ||
pipette_id: str, | ||
labware_id: str, | ||
well_name: str, | ||
well_location: Union[WellLocation, DropTipWellLocation], | ||
) -> None: | ||
"""Check if the labware is safe to move to with a pipette in partial tip configuration. | ||
Args: | ||
engine_state: engine state view | ||
pipette_id: ID of the pipette to be moved | ||
labware_id: ID of the labware we are moving to | ||
well_name: Name of the well to move to | ||
well_location: exact location within the well to move to | ||
""" | ||
# TODO: either hide unsupported configurations behind an advance setting | ||
# or log a warning that deck conflicts cannot be checked for tip config other than | ||
# column config with A12 primary nozzle for the 96 channel | ||
# or single tip config for 8-channel. | ||
if engine_state.pipettes.get_channels(pipette_id) == 96: | ||
_check_deck_conflict_for_96_channel( | ||
engine_state=engine_state, | ||
pipette_id=pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
elif engine_state.pipettes.get_channels(pipette_id) == 8: | ||
_check_deck_conflict_for_8_channel( | ||
engine_state=engine_state, | ||
pipette_id=pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
|
||
|
||
def _check_deck_conflict_for_96_channel( | ||
engine_state: StateView, | ||
pipette_id: str, | ||
labware_id: str, | ||
well_name: str, | ||
well_location: Union[WellLocation, DropTipWellLocation], | ||
) -> None: | ||
"""Check if there are any conflicts moving to the given labware with the configuration of 96-ch pipette.""" | ||
if not ( | ||
engine_state.pipettes.get_nozzle_layout_type(pipette_id) | ||
== NozzleConfigurationType.COLUMN | ||
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "A12" | ||
): | ||
# Checking deck conflicts only for 12th column config | ||
return | ||
|
||
if isinstance(well_location, DropTipWellLocation): | ||
# convert to WellLocation | ||
well_location = engine_state.geometry.get_checked_tip_drop_location( | ||
pipette_id=pipette_id, | ||
labware_id=labware_id, | ||
well_location=well_location, | ||
partially_configured=True, | ||
) | ||
|
||
well_location_point = engine_state.geometry.get_well_position( | ||
labware_id=labware_id, well_name=well_name, well_location=well_location | ||
) | ||
|
||
if not _is_within_pipette_extents( | ||
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point | ||
): | ||
raise PartialTipMovementNotAllowedError( | ||
"Requested motion with A12 nozzle column configuration" | ||
" is outside of robot bounds for the 96-channel." | ||
) | ||
|
||
labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id) | ||
west_slot_number = get_west_slot( | ||
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot)) | ||
) | ||
if west_slot_number is None: | ||
return | ||
|
||
west_slot = DeckSlotName.from_primitive( | ||
west_slot_number | ||
).to_equivalent_for_robot_type(engine_state.config.robot_type) | ||
|
||
west_slot_highest_z = engine_state.geometry.get_highest_z_in_slot( | ||
DeckSlotLocation(slotName=west_slot) | ||
) | ||
|
||
pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id) | ||
tip_length = pipette_tip.length if pipette_tip else 0.0 | ||
|
||
if ( | ||
west_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length | ||
): # a safe margin magic number | ||
raise PartialTipMovementNotAllowedError( | ||
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}" | ||
f" with a Column nozzle configuration will result in collision with" | ||
f" items in deck slot {west_slot}." | ||
) | ||
|
||
|
||
def _check_deck_conflict_for_8_channel( | ||
engine_state: StateView, | ||
pipette_id: str, | ||
labware_id: str, | ||
well_name: str, | ||
well_location: Union[WellLocation, DropTipWellLocation], | ||
) -> None: | ||
"""Check if there are any conflicts moving to the given labware with the configuration of 8-ch pipette.""" | ||
if not ( | ||
engine_state.pipettes.get_nozzle_layout_type(pipette_id) | ||
== NozzleConfigurationType.SINGLE | ||
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "H1" | ||
): | ||
# Checking deck conflicts only for H1 single tip config | ||
return | ||
|
||
if isinstance(well_location, DropTipWellLocation): | ||
# convert to WellLocation | ||
well_location = engine_state.geometry.get_checked_tip_drop_location( | ||
pipette_id=pipette_id, | ||
labware_id=labware_id, | ||
well_location=well_location, | ||
partially_configured=True, | ||
) | ||
|
||
well_location_point = engine_state.geometry.get_well_position( | ||
labware_id=labware_id, well_name=well_name, well_location=well_location | ||
) | ||
|
||
if not _is_within_pipette_extents( | ||
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point | ||
): | ||
# WARNING: (spp, 2023-11-30: this needs to be wired up to check for | ||
# 8-channel pipette extents on both OT2 & Flex!!) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it wired up for flex only right now? what's the failure case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's wired up to check the extents of only the 96-channel in rightmost column configuration. I don't have the 8-channel movement bounds for both OT2 & Flex. |
||
raise PartialTipMovementNotAllowedError( | ||
"Requested motion with single H1 nozzle configuration" | ||
" is outside of robot bounds for the 8-channel." | ||
) | ||
|
||
labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id) | ||
north_slot_number = get_north_slot( | ||
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot)) | ||
) | ||
if north_slot_number is None: | ||
return | ||
|
||
north_slot = DeckSlotName.from_primitive( | ||
north_slot_number | ||
).to_equivalent_for_robot_type(engine_state.config.robot_type) | ||
|
||
north_slot_highest_z = engine_state.geometry.get_highest_z_in_slot( | ||
DeckSlotLocation(slotName=north_slot) | ||
) | ||
|
||
pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id) | ||
tip_length = pipette_tip.length if pipette_tip else 0.0 | ||
|
||
if north_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length: | ||
raise PartialTipMovementNotAllowedError( | ||
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}" | ||
f" with a Single nozzle configuration will result in collision with" | ||
f" items in deck slot {north_slot}." | ||
) | ||
|
||
|
||
def _is_within_pipette_extents( | ||
engine_state: StateView, | ||
pipette_id: str, | ||
location: Point, | ||
) -> bool: | ||
"""Whether a given point is within the extents of a configured pipette on the specified robot.""" | ||
robot_type = engine_state.config.robot_type | ||
pipette_channels = engine_state.pipettes.get_channels(pipette_id) | ||
nozzle_config = engine_state.pipettes.get_nozzle_layout_type(pipette_id) | ||
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id) | ||
if robot_type == "OT-3 Standard": | ||
if ( | ||
pipette_channels == 96 | ||
and nozzle_config == NozzleConfigurationType.COLUMN | ||
and primary_nozzle == "A12" | ||
): | ||
return ( | ||
A12_column_front_left_bound.x | ||
< location.x | ||
< A12_column_back_right_bound.x | ||
and A12_column_front_left_bound.y | ||
< location.y | ||
< A12_column_back_right_bound.y | ||
) | ||
# TODO (spp, 2023-11-07): check for 8-channel nozzle H1 extents on Flex & OT2 | ||
return True | ||
|
||
|
||
def _map_labware( | ||
engine_state: StateView, | ||
labware_id: str, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
from opentrons_shared_data.pipette.dev_types import PipetteNameType | ||
from opentrons.protocol_api._nozzle_layout import NozzleLayout | ||
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType | ||
from . import deck_conflict | ||
|
||
from ..instrument import AbstractInstrument | ||
from .well import WellCore | ||
|
@@ -141,7 +142,13 @@ def aspirate( | |
absolute_point=location.point, | ||
) | ||
) | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
self._engine_client.aspirate( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
@@ -202,7 +209,13 @@ def dispense( | |
absolute_point=location.point, | ||
) | ||
) | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
self._engine_client.dispense( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
@@ -252,7 +265,13 @@ def blow_out( | |
absolute_point=location.point, | ||
) | ||
) | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
self._engine_client.blow_out( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
@@ -289,7 +308,13 @@ def touch_tip( | |
well_location = WellLocation( | ||
origin=WellOrigin.TOP, offset=WellOffset(x=0, y=0, z=z_offset) | ||
) | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
self._engine_client.touch_tip( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
@@ -331,7 +356,13 @@ def pick_up_tip( | |
well_name=well_name, | ||
absolute_point=location.point, | ||
) | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=well_location, | ||
) | ||
self._engine_client.pick_up_tip( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
@@ -376,7 +407,13 @@ def drop_tip( | |
) | ||
else: | ||
well_location = DropTipWellLocation() | ||
|
||
deck_conflict.check_safe_for_pipette_movement( | ||
engine_state=self._engine_client.state, | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
well_name=well_name, | ||
well_location=WellLocation(), | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self._engine_client.drop_tip( | ||
pipette_id=self._pipette_id, | ||
labware_id=labware_id, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a todo to push this somewhere else because I would like to derive this from geometry definitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep yep