Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): add deck conflict checks for pipetting with partial tip configuration #14066

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 221 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
"""A Protocol-Engine-friendly wrapper for opentrons.motion_planning.deck_conflict."""

import itertools
from typing import Collection, Dict, Optional, Tuple, overload
import logging
from typing import Collection, Dict, Optional, Tuple, overload, Union

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning.adjacent_slots_getters import (
get_north_slot,
get_west_slot,
)
from opentrons.protocol_engine import (
StateView,
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
OFF_DECK_LOCATION,
WellLocation,
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName
from opentrons.types import DeckSlotName, Point


class PartialTipMovementNotAllowedError(ValueError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this an enumerated error

"""Error raised when trying to perform a partial tip movement to an illegal location."""


_log = logging.getLogger(__name__)

# Arbitrary safety margin in z-direction
Z_SAFETY_MARGIN = 10

# Bounding box measurements
A12_column_front_left_bound = Point(x=-1.8, y=2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a todo to push this somewhere else because I would like to derive this from geometry definitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep yep

A12_column_back_right_bound = Point(x=592, y=506.2)


@overload
Expand Down Expand Up @@ -106,6 +128,203 @@ def check(
)


def check_safe_for_pipette_movement(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if the labware is safe to move to with a pipette in partial tip configuration.
Args:
engine_state: engine state view
pipette_id: ID of the pipette to be moved
labware_id: ID of the labware we are moving to
well_name: Name of the well to move to
well_location: exact location within the well to move to
"""
# TODO: either hide unsupported configurations behind an advance setting
# or log a warning that deck conflicts cannot be checked for tip config other than
# column config with A12 primary nozzle for the 96 channel
# or single tip config for 8-channel.
if engine_state.pipettes.get_channels(pipette_id) == 96:
_check_deck_conflict_for_96_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
elif engine_state.pipettes.get_channels(pipette_id) == 8:
_check_deck_conflict_for_8_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)


def _check_deck_conflict_for_96_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 96-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.COLUMN
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "A12"
):
# Checking deck conflicts only for 12th column config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
"Requested motion with A12 nozzle column configuration"
" is outside of robot bounds for the 96-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
west_slot_number = get_west_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if west_slot_number is None:
return

west_slot = DeckSlotName.from_primitive(
west_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

west_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=west_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if (
west_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length
): # a safe margin magic number
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Column nozzle configuration will result in collision with"
f" items in deck slot {west_slot}."
)


def _check_deck_conflict_for_8_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 8-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.SINGLE
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "H1"
):
# Checking deck conflicts only for H1 single tip config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
# WARNING: (spp, 2023-11-30: this needs to be wired up to check for
# 8-channel pipette extents on both OT2 & Flex!!)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it wired up for flex only right now? what's the failure case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's wired up to check the extents of only the 96-channel in rightmost column configuration. I don't have the 8-channel movement bounds for both OT2 & Flex.
So failure case is if you configure an 8-channel to use the frontmost nozzle and you made it move to well A1 of a labware in slot 11/ A1, then the pipette might crash against the back of the robot instead of raising an error that it's an unreachable location for the 8-channel in this configuration.

raise PartialTipMovementNotAllowedError(
"Requested motion with single H1 nozzle configuration"
" is outside of robot bounds for the 8-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
north_slot_number = get_north_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if north_slot_number is None:
return

north_slot = DeckSlotName.from_primitive(
north_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

north_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=north_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if (
north_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length
): # a safe margin magic number
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Single nozzle configuration will result in collision with"
f" items in deck slot {north_slot}."
)


def _is_within_pipette_extents(
engine_state: StateView,
pipette_id: str,
location: Point,
) -> bool:
"""Whether a given point is within the extents of a configured pipette on the specified robot."""
robot_type = engine_state.config.robot_type
pipette_channels = engine_state.pipettes.get_channels(pipette_id)
nozzle_config = engine_state.pipettes.get_nozzle_layout_type(pipette_id)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)
if robot_type == "OT-3 Standard":
if (
pipette_channels == 96
and nozzle_config == NozzleConfigurationType.COLUMN
and primary_nozzle == "A12"
):
return (
A12_column_front_left_bound.x
< location.x
< A12_column_back_right_bound.x
and A12_column_front_left_bound.y
< location.y
< A12_column_back_right_bound.y
)
# TODO (spp, 2023-11-07): check for 8-channel nozzle H1 extents on Flex & OT2
return True


def _map_labware(
engine_state: StateView,
labware_id: str,
Expand Down
49 changes: 43 additions & 6 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentrons_shared_data.pipette.dev_types import PipetteNameType
from opentrons.protocol_api._nozzle_layout import NozzleLayout
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from . import deck_conflict

from ..instrument import AbstractInstrument
from .well import WellCore
Expand Down Expand Up @@ -141,7 +142,13 @@ def aspirate(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.aspirate(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -202,7 +209,13 @@ def dispense(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.dispense(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -252,7 +265,13 @@ def blow_out(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.blow_out(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -289,7 +308,13 @@ def touch_tip(
well_location = WellLocation(
origin=WellOrigin.TOP, offset=WellOffset(x=0, y=0, z=z_offset)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.touch_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -331,7 +356,13 @@ def pick_up_tip(
well_name=well_name,
absolute_point=location.point,
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.pick_up_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -376,7 +407,13 @@ def drop_tip(
)
else:
well_location = DropTipWellLocation()

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(),
)
Copy link
Member Author

@sanni-t sanni-t Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed adding the check to move_to. Do we want to allow move_to to bypass this check?
Nevermind. move_to is for moving to non-well locations, which makes it checking for collisions with surrounding deck items non-trivial.

self._engine_client.drop_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down
Loading
Loading