Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): use critical point instead of primary nozzle when doing deck conflict check #16268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons_shared_data.module import FLEX_TC_LID_COLLISION_ZONE

from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning import adjacent_slots_getters
Expand Down Expand Up @@ -228,9 +229,13 @@ def check_safe_for_pipette_movement(
)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)

destination_cp = _get_critical_point_to_use(engine_state, labware_id)

pipette_bounds_at_well_location = (
engine_state.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id=pipette_id, destination_position=well_location_point
pipette_id=pipette_id,
destination_position=well_location_point,
critical_point=destination_cp,
)
)
if not _is_within_pipette_extents(
Expand Down Expand Up @@ -284,6 +289,21 @@ def check_safe_for_pipette_movement(
)


def _get_critical_point_to_use(
engine_state: StateView, labware_id: str
) -> Optional[CriticalPoint]:
"""Return the critical point to use when accessing the given labware."""
# TODO (spp, 2024-09-17): looks like Y_CENTER of column is the same as its XY_CENTER.
# I'm using this if-else ladder to be consistent with what we do in
# `MotionPlanning.get_movement_waypoints_to_well()`.
# We should probably use only XY_CENTER in both places.
if engine_state.labware.get_should_center_column_on_target_well(labware_id):
return CriticalPoint.Y_CENTER
elif engine_state.labware.get_should_center_pipette_on_target_well(labware_id):
return CriticalPoint.XY_CENTER
return None


def _slot_has_potential_colliding_object(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
Expand Down
65 changes: 45 additions & 20 deletions api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from opentrons_shared_data.pipette import pipette_definition
from opentrons.config.defaults_ot2 import Z_RETRACT_DISTANCE
from opentrons.hardware_control.dev_types import PipetteDict
from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.nozzle_manager import (
NozzleConfigurationType,
NozzleMap,
Expand Down Expand Up @@ -795,17 +796,27 @@ def get_primary_nozzle(self, pipette_id: str) -> Optional[str]:
nozzle_map = self._state.nozzle_configuration_by_id.get(pipette_id)
return nozzle_map.starting_nozzle if nozzle_map else None

def get_primary_nozzle_offset(self, pipette_id: str) -> Point:
"""Get the pipette's current primary nozzle's offset."""
def _get_critical_point_offset_without_tip(
self, pipette_id: str, critical_point: Optional[CriticalPoint]
) -> Point:
"""Get the offset of the specified critical point from pipette's mount position."""
nozzle_map = self._state.nozzle_configuration_by_id.get(pipette_id)
if nozzle_map:
primary_nozzle_offset = nozzle_map.starting_nozzle_offset
else:
# When not in partial configuration, back-left nozzle is the primary
primary_nozzle_offset = self.get_config(
pipette_id
).bounding_nozzle_offsets.back_left_offset
return primary_nozzle_offset
# Nozzle map is unavailable only when there's no pipette loaded
# so this is merely for satisfying the type checker
assert (
nozzle_map is not None
), "Error getting critical point offset. Nozzle map not found."
match critical_point:
case CriticalPoint.INSTRUMENT_XY_CENTER:
return nozzle_map.instrument_xy_center_offset
case CriticalPoint.XY_CENTER:
return nozzle_map.xy_center_offset
case CriticalPoint.Y_CENTER:
return nozzle_map.y_center_offset
case CriticalPoint.FRONT_NOZZLE:
return nozzle_map.front_nozzle_offset
case _:
return nozzle_map.starting_nozzle_offset

def get_pipette_bounding_nozzle_offsets(
self, pipette_id: str
Expand All @@ -817,32 +828,46 @@ def get_pipette_bounding_box(self, pipette_id: str) -> PipetteBoundingBoxOffsets
"""Get the bounding box of the pipette."""
return self.get_config(pipette_id).pipette_bounding_box_offsets

# TODO (spp, 2024-09-17): in order to find the position of pipette at destination,
# this method repeats the same steps that waypoints builder does while finding
# waypoints to move to. We should consolidate these steps into a shared entity
# so that the deck conflict checker and movement plan builder always remain in sync.
def get_pipette_bounds_at_specified_move_to_position(
self,
pipette_id: str,
destination_position: Point,
critical_point: Optional[CriticalPoint],
) -> Tuple[Point, Point, Point, Point]:
"""Get the pipette's bounding offsets when primary nozzle is at the given position."""
primary_nozzle_offset = self.get_primary_nozzle_offset(pipette_id)
"""Get the pipette's bounding box position when critical point is at the destination position.

Returns a tuple of the pipette's bounding box position in deck coordinates as-
(back_left_bound, front_right_bound, back_right_bound, front_left_bound)
Bounding box of the pipette includes the pipette's outer casing as well as nozzles.
"""
tip = self.get_attached_tip(pipette_id)
# TODO update this for pipette robot stackup
# Primary nozzle position at destination, in deck coordinates
primary_nozzle_position = destination_position + Point(

# *Offset* of pipette's critical point w.r.t pipette mount
critical_point_offset = self._get_critical_point_offset_without_tip(
pipette_id, critical_point
)

# Position of the above critical point at destination, in deck coordinates
critical_point_position = destination_position + Point(
x=0, y=0, z=tip.length if tip else 0
)

# Get the pipette bounding box based on total nozzles
# Get the pipette bounding box coordinates
pipette_bounds_offsets = self.get_config(
pipette_id
).pipette_bounding_box_offsets
pip_back_left_bound = (
primary_nozzle_position
- primary_nozzle_offset
critical_point_position
- critical_point_offset
+ pipette_bounds_offsets.back_left_corner
)
pip_front_right_bound = (
primary_nozzle_position
- primary_nozzle_offset
critical_point_position
- critical_point_offset
+ pipette_bounds_offsets.front_right_corner
)
pip_back_right_bound = Point(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from opentrons_shared_data.labware.types import LabwareUri
from opentrons_shared_data.robot.types import RobotType

from opentrons.hardware_control import CriticalPoint
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning import adjacent_slots_getters
Expand Down Expand Up @@ -545,9 +546,21 @@ def test_deck_conflict_raises_for_bad_pipette_move(
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)
decoy.when(
mock_state_view.labware.get_should_center_column_on_target_well(
"destination-labware-id"
)
).then_return(False)
decoy.when(
mock_state_view.labware.get_should_center_pipette_on_target_well(
"destination-labware-id"
)
).then_return(False)
decoy.when(
mock_state_view.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
pipette_id="pipette-id",
destination_position=destination_well_point,
critical_point=None,
)
).then_return(pipette_bounds)

Expand Down Expand Up @@ -653,9 +666,17 @@ def test_deck_conflict_raises_for_collision_with_tc_lid(
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)

decoy.when(
mock_state_view.labware.get_should_center_column_on_target_well(
"destination-labware-id"
)
).then_return(True)
decoy.when(
mock_state_view.pipettes.get_pipette_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
pipette_id="pipette-id",
destination_position=destination_well_point,
critical_point=CriticalPoint.Y_CENTER,
)
).then_return(pipette_bounds_at_destination)
decoy.when(mock_state_view.pipettes.get_mount("pipette-id")).then_return(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from opentrons import simulate
from opentrons.protocol_api import COLUMN, ALL, SINGLE
from opentrons.protocol_api import COLUMN, ALL, SINGLE, ROW
from opentrons.protocol_api.core.engine.deck_conflict import (
PartialTipMovementNotAllowedError,
)
Expand Down Expand Up @@ -226,3 +226,104 @@ def test_deck_conflicts_for_96_ch_a1_column_configuration() -> None:

# No error NOW because of full config
instrument.dispense(50, badly_placed_plate.wells_by_name()["A1"].bottom())


@pytest.mark.ot3_only
def test_deck_conflicts_for_96_ch_and_reservoirs() -> None:
"""It should raise errors for expected deck conflicts when moving to reservoirs.

This test checks that the critical point of the pipette is taken into account,
specifically when it differs from the primary nozzle.
"""
protocol = simulate.get_protocol_api(version="2.20", robot_type="Flex")
instrument = protocol.load_instrument("flex_96channel_1000", mount="left")
# trash_labware = protocol.load_labware("opentrons_1_trash_3200ml_fixed", "A3")
# instrument.trash_container = trash_labware

protocol.load_trash_bin("A3")
right_tiprack = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "C3")
front_tiprack = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "D2")
# Tall deck item in B3
protocol.load_labware(
"opentrons_flex_96_tiprack_50ul",
"B3",
adapter="opentrons_flex_96_tiprack_adapter",
)
# Tall deck item in B1
protocol.load_labware(
"opentrons_flex_96_tiprack_50ul",
"B1",
adapter="opentrons_flex_96_tiprack_adapter",
)

# ############ RESERVOIRS ################
# These labware should be to the east of tall labware to avoid any partial tip deck conflicts
reservoir_1_well = protocol.load_labware("nest_1_reservoir_195ml", "C2")
reservoir_12_well = protocol.load_labware("nest_12_reservoir_15ml", "B2")

# ########### Use COLUMN A1 Config #############
instrument.configure_nozzle_layout(style=COLUMN, start="A1")

instrument.pick_up_tip(front_tiprack.wells_by_name()["A12"])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.aspirate(10, reservoir_1_well.wells()[0])

instrument.aspirate(25, reservoir_12_well.wells()[0])
instrument.dispense(10, reservoir_12_well.wells()[1])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.dispense(15, reservoir_12_well.wells()[3])

instrument.drop_tip()
front_tiprack.reset()

# ########### Use COLUMN A12 Config #############
instrument.configure_nozzle_layout(style=COLUMN, start="A12")

instrument.pick_up_tip(front_tiprack.wells_by_name()["A1"])
instrument.aspirate(50, reservoir_1_well.wells()[0])
with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.dispense(10, reservoir_12_well.wells()[8])

instrument.dispense(15, reservoir_12_well.wells()[11])
instrument.dispense(10, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()

# ######## CHANGE CONFIG TO ROW H1 #########
instrument.configure_nozzle_layout(style=ROW, start="H1", tip_racks=[front_tiprack])
with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.pick_up_tip(right_tiprack.wells_by_name()["A1"])

instrument.pick_up_tip()
instrument.aspirate(25, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()

# ######## CHANGE CONFIG TO ROW A1 #########
instrument.configure_nozzle_layout(style=ROW, start="A1", tip_racks=[front_tiprack])

with pytest.raises(
PartialTipMovementNotAllowedError, match="outside of robot bounds"
):
instrument.pick_up_tip()
instrument.pick_up_tip(right_tiprack.wells_by_name()["H1"])

with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.aspirate(25, reservoir_1_well.wells()[0])

instrument.drop_tip()
front_tiprack.reset()
Loading
Loading