Skip to content

Commit

Permalink
fix(engine): take calibrated tip length into account during tip pickup (
Browse files Browse the repository at this point in the history
#9246)

Closes #9220
  • Loading branch information
mcous authored Jan 13, 2022
1 parent 411c8db commit 7866767
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 172 deletions.
1 change: 1 addition & 0 deletions api/src/opentrons/calibration_storage/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def _get_tip_length_data(
)


# TODO(mc, 2022-01-12): no longer used; remove
def get_labware_calibration(
lookup_path: local_types.StrPath,
definition: "LabwareDefinition",
Expand Down
1 change: 1 addition & 0 deletions api/src/opentrons/calibration_storage/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def hash_labware_def(labware_def: "LabwareDefinition") -> str:
blocklist = ["metadata", "brand", "groups"]
def_no_metadata = {k: v for k, v in labware_def.items() if k not in blocklist}
sorted_def_str = json.dumps(def_no_metadata, sort_keys=True, separators=(",", ":"))

return sha256(sorted_def_str.encode("utf-8")).hexdigest()


Expand Down
29 changes: 9 additions & 20 deletions api/src/opentrons/protocol_engine/execution/hardware_stopper.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,25 @@ async def _drop_tip(self) -> None:
axes=[MotorAxis.X, MotorAxis.Y, MotorAxis.LEFT_Z, MotorAxis.RIGHT_Z]
)

for pip_id, tiprack_id in attached_tip_racks.items():
for pipette_id, tiprack_id in attached_tip_racks.items():
try:
hw_pipette = self._state_store.pipettes.get_hardware_pipette(
pipette_id=pip_id,
attached_pipettes=self._hardware_api.attached_instruments,
)

except PipetteNotAttachedError:
# this will happen normally during protocol analysis, but
# should not happen during an actual run
log.debug(f"Pipette ID {pip_id} no longer attached.")

else:
tip_geometry = self._state_store.geometry.get_tip_geometry(
await self._pipetting_handler.add_tip(
pipette_id=pipette_id,
labware_id=tiprack_id,
well_name="A1",
pipette_config=hw_pipette.config,
)
await self._hardware_api.add_tip(
mount=hw_pipette.mount,
tip_length=tip_geometry.effective_length,
)
# TODO: Add ability to drop tip onto custom trash as well.
await self._pipetting_handler.drop_tip(
pipette_id=pip_id,
pipette_id=pipette_id,
labware_id=FIXED_TRASH_ID,
well_name="A1",
well_location=WellLocation(),
)

except PipetteNotAttachedError:
# this will happen normally during protocol analysis, but
# should not happen during an actual run
log.debug(f"Pipette ID {pipette_id} no longer attached.")

async def do_halt(self) -> None:
"""Issue a halt signal to the hardware API.
Expand Down
87 changes: 75 additions & 12 deletions api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
"""Pipetting command handling."""
from typing import NamedTuple, Optional

from opentrons.types import Mount as HardwareMount
from opentrons.hardware_control import HardwareControlAPI

from ..state import StateStore, CurrentWell
from ..resources import LabwareDataProvider
from ..types import WellLocation, WellOrigin
from .movement import MovementHandler


class _TipPickupData(NamedTuple):
hw_mount: HardwareMount
tip_length: float
tip_diameter: float
tip_volume: int


class PipettingHandler:
"""Implementation logic for liquid handling commands."""

Expand All @@ -18,33 +29,68 @@ def __init__(
state_store: StateStore,
hardware_api: HardwareControlAPI,
movement_handler: MovementHandler,
labware_data_provider: Optional[LabwareDataProvider] = None,
) -> None:
"""Initialize a PipettingHandler instance."""
self._state_store = state_store
self._hardware_api = hardware_api
self._movement_handler = movement_handler
self._labware_data_provider = labware_data_provider or LabwareDataProvider()

async def pick_up_tip(
async def _get_tip_details(
self,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: WellLocation,
) -> None:
"""Pick up a tip at the specified "well"."""
well_name: Optional[str] = None,
) -> _TipPickupData:
"""Retrieve data needed by the HardwareAPI for a tip pickup."""
# get mount and config data from state and hardware controller
hw_pipette = self._state_store.pipettes.get_hardware_pipette(
pipette_id=pipette_id,
attached_pipettes=self._hardware_api.attached_instruments,
)

# get the requested tip rack's definition for pulling calibrated tip length
tip_rack_def = self._state_store.labware.get_definition(labware_id)

# use config data to get tip geometry (length, diameter, volume)
tip_geometry = self._state_store.geometry.get_tip_geometry(
nominal_tip_geometry = self._state_store.geometry.get_nominal_tip_geometry(
labware_id=labware_id,
well_name=well_name,
pipette_config=hw_pipette.config,
)

# TODO(mc, 2022-01-12): this call hits the filesystem, which has performance
# implications over the course of a protocol since most calls will be redundant
tip_length = await self._labware_data_provider.get_calibrated_tip_length(
pipette_serial=hw_pipette.config["pipette_id"],
labware_definition=tip_rack_def,
)

if tip_length is None:
tip_length = nominal_tip_geometry.effective_length

return _TipPickupData(
hw_mount=hw_pipette.mount,
tip_length=tip_length,
tip_diameter=nominal_tip_geometry.diameter,
tip_volume=nominal_tip_geometry.volume,
)

async def pick_up_tip(
self,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: WellLocation,
) -> None:
"""Pick up a tip at the specified "well"."""
hw_mount, tip_length, tip_diameter, tip_volume = await self._get_tip_details(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
)

# move the pipette to the top of the tip
await self._movement_handler.move_to_well(
pipette_id=pipette_id,
Expand All @@ -55,22 +101,39 @@ async def pick_up_tip(

# perform the tip pickup routine
await self._hardware_api.pick_up_tip(
mount=hw_pipette.mount,
tip_length=tip_geometry.effective_length,
mount=hw_mount,
tip_length=tip_length,
# TODO(mc, 2020-11-12): include these parameters in the request
presses=None,
increment=None,
)

# after a successful pickup, update the hardware controller state
self._hardware_api.set_current_tiprack_diameter(
mount=hw_pipette.mount,
tiprack_diameter=tip_geometry.diameter,
mount=hw_mount,
tiprack_diameter=tip_diameter,
)
self._hardware_api.set_working_volume(
mount=hw_pipette.mount,
tip_volume=tip_geometry.volume,
mount=hw_mount,
tip_volume=tip_volume,
)

async def add_tip(self, pipette_id: str, labware_id: str) -> None:
"""Manually add a tip to a pipette in the hardware API.
Used to enable a drop tip even if the HW API thinks no tip is attached.
"""
hw_mount, tip_length, tip_diameter, tip_volume = await self._get_tip_details(
pipette_id=pipette_id,
labware_id=labware_id,
)

await self._hardware_api.add_tip(mount=hw_mount, tip_length=tip_length)
self._hardware_api.set_current_tiprack_diameter(
mount=hw_mount,
tiprack_diameter=tip_diameter,
)
self._hardware_api.set_working_volume(mount=hw_mount, tip_volume=tip_volume)

async def drop_tip(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,81 @@
"""Labware data resource provider."""
"""Labware data resource provider.
This module is a wrapper around existing, but older, internal APIs to
abstract away rough edges until we can improve those underlying interfaces.
"""
import logging
from anyio import to_thread
from typing import Optional, cast

from opentrons_shared_data.labware.dev_types import LabwareDefinition as LabwareDefDict
from opentrons.protocols.models import LabwareDefinition
from opentrons.protocol_api.labware import get_labware_definition
from opentrons.protocols.labware import get_labware_definition
from opentrons.calibration_storage.get import load_tip_length_calibration
from opentrons.calibration_storage.types import TipLengthCalNotFound


log = logging.getLogger(__name__)


class LabwareDataProvider:
"""Labware data provider."""

# NOTE(mc, 2020-10-18): async to allow file reading and parsing to be
# async on a worker thread in the future
@staticmethod
async def get_labware_definition(
load_name: str,
namespace: str,
version: int,
) -> LabwareDefinition:
"""Get a labware definition given the labware's identification."""
"""Get a labware definition given the labware's identification.
Note: this method hits the filesystem, which will have performance
implications if it is called often.
"""
return await to_thread.run_sync(
LabwareDataProvider._get_labware_definition_sync,
load_name,
namespace,
version,
)

@staticmethod
def _get_labware_definition_sync(
load_name: str, namespace: str, version: int
) -> LabwareDefinition:
return LabwareDefinition.parse_obj(
get_labware_definition(load_name, namespace, version)
)

@staticmethod
async def get_calibrated_tip_length(
pipette_serial: str,
labware_definition: LabwareDefinition,
) -> Optional[float]:
"""Get the calibrated tip length of a tip rack / pipette pair.
Note: this method hits the filesystem, which will have performance
implications if it is called often.
"""
return await to_thread.run_sync(
LabwareDataProvider._get_calibrated_tip_length_sync,
pipette_serial,
labware_definition,
)

@staticmethod
def _get_calibrated_tip_length_sync(
pipette_serial: str,
labware_definition: LabwareDefinition,
) -> Optional[float]:
try:
return load_tip_length_calibration(
pip_id=pipette_serial,
definition=cast(
LabwareDefDict,
labware_definition.dict(exclude_none=True),
),
).tip_length

except TipLengthCalNotFound as e:
log.warn("No calibrated tip length found", exc_info=e)
return None
34 changes: 24 additions & 10 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
DEFAULT_TIP_DROP_HEIGHT_FACTOR = 0.5


# TODO(mc, 2020-11-12): reconcile this data structure with WellGeometry
@dataclass(frozen=True)
class TipGeometry:
"""Tip geometry data."""
"""Nominal tip geometry data.
This data is loaded from definitions and configurations, and does
not take calibration values into account.
Props:
effective_length: The nominal working length (total length minus overlap)
of a tip, according to a tip rack and pipette's definitions.
diameter: Nominal tip diameter.
volume: Nominal volume capacity.
"""

effective_length: float
diameter: float
Expand Down Expand Up @@ -137,17 +146,17 @@ def _get_highest_z_from_labware_data(self, lw_data: LoadedLabware) -> float:
height_over_labware = self._modules.get_height_over_labware(module_id)
return labware_pos.z + z_dim + height_over_labware

# TODO(mc, 2020-11-12): reconcile with existing protocol logic and include
# data from tip-length calibration once v4.0.0 is in `edge`
def get_effective_tip_length(
def get_nominal_effective_tip_length(
self,
labware_id: str,
pipette_config: PipetteDict,
) -> float:
"""Given a labware and a pipette's config, get the effective tip length.
Effective tip length is the nominal tip length less the distance the
tip overlaps with the pipette nozzle.
tip overlaps with the pipette nozzle. This does not take calibrated
tip lengths into account. For calibrated data,
see `LabwareDataProvider.get_calibrated_tip_length`.
"""
labware_uri = self._labware.get_definition_uri(labware_id)
nominal_length = self._labware.get_tip_length(labware_id)
Expand All @@ -157,19 +166,24 @@ def get_effective_tip_length(

return nominal_length - overlap

# TODO(mc, 2020-11-12): reconcile with existing geometry logic
def get_tip_geometry(
def get_nominal_tip_geometry(
self,
labware_id: str,
well_name: str,
pipette_config: PipetteDict,
well_name: Optional[str] = None,
) -> TipGeometry:
"""Given a labware, well, and hardware pipette config, get the tip geometry.
Tip geometry includes effective tip length, tip diameter, and tip volume,
which is all data required by the hardware controller for proper tip handling.
This geometry data is based solely on labware and pipette definitions and
does not take calibrated tip lengths into account.
"""
effective_length = self.get_effective_tip_length(labware_id, pipette_config)
effective_length = self.get_nominal_effective_tip_length(
labware_id=labware_id,
pipette_config=pipette_config,
)
well_def = self._labware.get_well_definition(labware_id, well_name)

if well_def.shape != "circular":
Expand Down
11 changes: 9 additions & 2 deletions api/src/opentrons/protocol_engine/state/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,18 @@ def get_quirks(self, labware_id: str) -> List[str]:
def get_well_definition(
self,
labware_id: str,
well_name: str,
well_name: Optional[str] = None,
) -> WellDefinition:
"""Get a well's definition by labware and well identifier."""
"""Get a well's definition by labware and well name.
If `well_name` is omitted, the first well in the labware
will be used.
"""
definition = self.get_definition(labware_id)

if well_name is None:
well_name = definition.ordering[0][0]

try:
return definition.wells[well_name]
except KeyError as e:
Expand Down
Loading

0 comments on commit 7866767

Please sign in to comment.