Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1925,17 +1925,32 @@ async def drop_tips(
# get highest z position
max_z = max(op.resource.get_location_wrt(self.deck).z + op.offset.z for op in ops)
if drop_method == TipDropMethod.PLACE_SHIFT:
# magic values empirically found in https://github.com/PyLabRobot/pylabrobot/pull/63
begin_tip_deposit_process = (
round((max_z + 59.9) * 10)
if begin_tip_deposit_process is None
else round(begin_tip_deposit_process * 10)
)
end_tip_deposit_process = (
round((max_z + 49.9) * 10)
if end_tip_deposit_process is None
else round(end_tip_deposit_process * 10)
)
use_addressable_waste_z = all(isinstance(op.resource, Trash) for op in ops) and all(
getattr(op.resource, "category", None) == "waste_position" for op in ops
)
if use_addressable_waste_z:
begin_tip_deposit_process = (
round((max_z + 10) * 10)
if begin_tip_deposit_process is None
else round(begin_tip_deposit_process * 10)
)
end_tip_deposit_process = (
round(max_z * 10)
if end_tip_deposit_process is None
else round(end_tip_deposit_process * 10)
)
else:
# magic values empirically found in https://github.com/PyLabRobot/pylabrobot/pull/63
begin_tip_deposit_process = (
round((max_z + 59.9) * 10)
if begin_tip_deposit_process is None
else round(begin_tip_deposit_process * 10)
)
end_tip_deposit_process = (
round((max_z + 49.9) * 10)
if end_tip_deposit_process is None
else round(end_tip_deposit_process * 10)
)
else:
max_total_tip_length = max(op.tip.total_tip_length for op in ops)
max_tip_length = max((op.tip.total_tip_length - op.tip.fitting_depth) for op in ops)
Expand Down
22 changes: 21 additions & 1 deletion pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def asyncSetUp(self):
self.STAR.io.write = unittest.mock.MagicMock()
self.STAR.io.read = unittest.mock.MagicMock()

self.deck = STARLetDeck()
self.deck = STARLetDeck(waste_positions=None)
self.lh = LiquidHandler(self.STAR, deck=self.deck)

self.tip_car = TIP_CAR_480_A00(name="tip carrier")
Expand Down Expand Up @@ -1015,6 +1015,26 @@ async def test_discard_tips(self):
]
)

async def test_discard_tips_to_waste_positions(self):
"""With addressable waste positions, discard_tips uses them and waste-spot z (tp1970tz1870)."""
deck = STARLetDeck()
tip_car = TIP_CAR_480_A00(name="tip carrier")
tip_car[1] = tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack_01")
deck.assign_child_resource(tip_car, rails=1)
lh = LiquidHandler(self.STAR, deck=deck)
await lh.setup()

await lh.pick_up_tips(tip_rack["A1:H1"])
self.STAR._write_and_read_command.reset_mock()
await lh.discard_tips()
self.STAR._write_and_read_command.assert_has_calls(
[
_any_write_and_read_command_call(
"C0TRid0003xp08000 08000 08000 08000 08000 08000 08000 08000yp4050 3800 3550 3300 3050 2800 2550 2300tm1 1 1 1 1 1 1 1tp1970tz1870th2450te2450ti0",
)
]
)

async def test_portrait_tip_rack_handling(self):
deck = STARLetDeck()
lh = LiquidHandler(self.STAR, deck=deck)
Expand Down
41 changes: 29 additions & 12 deletions pylabrobot/liquid_handling/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from pylabrobot.liquid_handling.utils import (
get_tight_single_resource_liquid_op_offsets,
get_waste_positions_for_n_channels,
get_wide_single_resource_liquid_op_offsets,
)
from pylabrobot.machines.machine import Machine, need_setup_finished
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(
super().assign_child_resource(deck, location=deck.location or Coordinate.zero())

self._resource_pickups: Dict[int, Optional[ResourcePickup]] = {}
self._channel_waste_positions: Optional[List[Trash]] = None # set in setup()

@property
def _resource_pickup(self) -> Optional[ResourcePickup]:
Expand Down Expand Up @@ -183,6 +185,11 @@ async def setup(self, **backend_kwargs):

self._resource_pickups = {a: None for a in range(self.backend.num_arms)}

positions = self.deck.get_waste_positions()
self._channel_waste_positions = get_waste_positions_for_n_channels(
positions, self.backend.num_channels
)

def serialize_state(self) -> Dict[str, Any]:
"""Serialize the state of this liquid handler. Use :meth:`~Resource.serialize_all_states` to
serialize the state of the liquid handler and all children (the deck)."""
Expand Down Expand Up @@ -767,20 +774,30 @@ async def discard_tips(
if n == 0:
raise RuntimeError("No tips have been picked up and no channels were specified.")

trash = self.deck.get_trash_area()
trash_offsets = get_tight_single_resource_liquid_op_offsets(
trash,
num_channels=n,
)
# add trash_offsets to offsets if defined, otherwise use trash_offsets
# too advanced for mypy
offsets = [
o + to if o is not None else to
for o, to in zip(offsets or [None] * n, trash_offsets) # type: ignore
]
if self._channel_waste_positions is None:
raise RuntimeError("Setup has not been run. Call LiquidHandler.setup() first.")

waste_spots = [self._channel_waste_positions[c] for c in use_channels]
if all(s is waste_spots[0] for s in waste_spots):
trash = waste_spots[0]
trash_offsets = get_tight_single_resource_liquid_op_offsets(
trash,
num_channels=n,
)
offsets = [
o + to if o is not None else to
for o, to in zip(offsets or [None] * n, trash_offsets) # type: ignore
]
return await self.drop_tips(
tip_spots=[trash] * n,
use_channels=use_channels,
offsets=offsets,
allow_nonzero_volume=allow_nonzero_volume,
**backend_kwargs,
)

return await self.drop_tips(
tip_spots=[trash] * n,
tip_spots=waste_spots,
use_channels=use_channels,
offsets=offsets,
allow_nonzero_volume=allow_nonzero_volume,
Expand Down
83 changes: 81 additions & 2 deletions pylabrobot/liquid_handling/liquid_handler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
Strictness,
set_strictness,
)
from pylabrobot.liquid_handling.utils import get_tight_single_resource_liquid_op_offsets
from pylabrobot.liquid_handling.utils import (
get_tight_single_resource_liquid_op_offsets,
get_waste_positions_for_n_channels,
)
from pylabrobot.resources import (
PLT_CAR_L5AC_A00,
TIP_CAR_480_A00,
Expand Down Expand Up @@ -473,12 +476,43 @@ def test_serialize(self):
)


class TestGetWastePositionsForNChannels(unittest.TestCase):
"""Tests for get_waste_positions_for_n_channels helper."""

def test_single_position_repeats_to_n(self):
deck = STARLetDeck(waste_positions=None)
positions = deck.get_waste_positions()
self.assertEqual(len(positions), 1)
result = get_waste_positions_for_n_channels(positions, 8)
self.assertEqual(len(result), 8)
self.assertTrue(all(r is positions[0] for r in result))

def test_sixteen_positions_subset_to_eight(self):
deck = STARLetDeck()
positions = deck.get_waste_positions()
self.assertEqual(len(positions), 16)
result = get_waste_positions_for_n_channels(positions, 8)
self.assertEqual(len(result), 8)
self.assertEqual(
[r.name for r in result],
[f"waste_position_{2 * i + 1}" for i in range(8)],
)

def test_n_greater_than_positions_raises(self):
deck = STARLetDeck()
positions = deck.get_waste_positions()
with self.assertRaises(ValueError) as ctx:
get_waste_positions_for_n_channels(positions, 20)
self.assertIn("Requested 20", str(ctx.exception))
self.assertIn("only has 16", str(ctx.exception))


class TestLiquidHandlerCommands(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.maxDiff = None

self.backend = _create_mock_backend(num_channels=8)
self.deck = STARLetDeck()
self.deck = STARLetDeck(waste_positions=None)
self.lh = LiquidHandler(backend=self.backend, deck=self.deck)

self.tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack")
Expand All @@ -487,6 +521,51 @@ async def asyncSetUp(self):
self.deck.assign_child_resource(self.plate, location=Coordinate(100, 100, 0))
await self.lh.setup()

async def test_channel_waste_positions_set_at_setup(self):
"""After setup, _channel_waste_positions has length backend.num_channels."""
positions = self.lh._channel_waste_positions
self.assertIsNotNone(positions)
assert positions is not None # for mypy
self.assertEqual(len(positions), 8)
# Single trash deck: all entries are the same trash
self.assertTrue(all(r is positions[0] for r in positions))

async def test_discard_tips_before_setup_raises(self):
"""discard_tips before setup raises a clear error."""
backend = _create_mock_backend(num_channels=8)
deck = STARLetDeck(waste_positions=None)
lh = LiquidHandler(backend=backend, deck=deck)
with self.assertRaises(RuntimeError) as ctx:
await lh.discard_tips(use_channels=[0])
self.assertIn("Setup has not been run", str(ctx.exception))

async def test_discard_tips_uses_per_channel_waste_positions(self):
"""With addressable waste, discard_tips sends each channel to its assigned waste position."""
deck = STARLetDeck() # 16 addressable waste positions
backend = _create_mock_backend(num_channels=8)
lh = LiquidHandler(backend=backend, deck=deck)
tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack")
deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
await lh.setup()

# Pick up on channels 0, 2, 5 only
await lh.pick_up_tips(
tip_rack["A1", "C1", "F1"],
use_channels=[0, 2, 5],
)
backend.drop_tips.reset_mock()
await lh.discard_tips(use_channels=[0, 2, 5])

# Each channel should go to its pre-assigned waste position (from setup), not first 3
call = backend.drop_tips.call_args
ops = call.kwargs["ops"]
use_channels = call.kwargs["use_channels"]
self.assertEqual(use_channels, [0, 2, 5])
self.assertEqual(len(ops), 3)
# With 8 channels over 16 positions: channel 0 -> waste_position_1, 2 -> waste_position_5, 5 -> waste_position_11
expected_names = ["waste_position_1", "waste_position_5", "waste_position_11"]
self.assertEqual([op.resource.name for op in ops], expected_names)

async def test_offsets_tips(self):
tip_spot = self.tip_rack.get_item("A1")
tip = tip_spot.get_tip()
Expand Down
12 changes: 12 additions & 0 deletions pylabrobot/liquid_handling/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pylabrobot.resources.coordinate import Coordinate
from pylabrobot.resources.resource import Resource
from pylabrobot.resources.trash import Trash

MIN_SPACING_BETWEEN_CHANNELS = 9.0
# minimum spacing between the edge of the container and the center of channel
Expand Down Expand Up @@ -68,3 +69,14 @@ def get_tight_single_resource_liquid_op_offsets(
)
for c in centers
]


def get_waste_positions_for_n_channels(positions: List[Trash], n: int) -> List[Trash]:
"""Return n waste positions from the deck's list: repeat if single, else subset evenly."""
if len(positions) == 1:
return [positions[0]] * n
if n > len(positions):
raise ValueError(f"Requested {n} waste positions but deck only has {len(positions)}.")
M = len(positions)
indices = [i * (M // n) for i in range(n)]
return [positions[j] for j in indices]
4 changes: 4 additions & 0 deletions pylabrobot/resources/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ def summary(self) -> str:
def get_trash_area96(self) -> Trash:
deck_class = self.__class__.__name__
raise NotImplementedError(f"This method is not implemented by deck '{deck_class}'")

def get_waste_positions(self) -> List[Trash]:
"""Return the list of waste positions on this deck. Default: single trash area."""
return [cast(Trash, self.get_trash_area())]
30 changes: 30 additions & 0 deletions pylabrobot/resources/hamilton/hamilton_deck_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,43 @@ def test_summary(self):
(31) ├── waste_block Resource (775.000, 115.000, 100.000)
│ ├── teaching_tip_rack TipRack (780.900, 461.100, 100.000)
│ ├── waste_position_1 Trash (800.000, 405.000, 187.000)
│ ├── waste_position_2 Trash (800.000, 392.500, 187.000)
│ ├── waste_position_3 Trash (800.000, 380.000, 187.000)
│ ├── waste_position_4 Trash (800.000, 367.500, 187.000)
│ ├── waste_position_5 Trash (800.000, 355.000, 187.000)
│ ├── waste_position_6 Trash (800.000, 342.500, 187.000)
│ ├── waste_position_7 Trash (800.000, 330.000, 187.000)
│ ├── waste_position_8 Trash (800.000, 317.500, 187.000)
│ ├── waste_position_9 Trash (800.000, 305.000, 187.000)
│ ├── waste_position_10 Trash (800.000, 292.500, 187.000)
│ ├── waste_position_11 Trash (800.000, 280.000, 187.000)
│ ├── waste_position_12 Trash (800.000, 267.500, 187.000)
│ ├── waste_position_13 Trash (800.000, 255.000, 187.000)
│ ├── waste_position_14 Trash (800.000, 242.500, 187.000)
│ ├── waste_position_15 Trash (800.000, 230.000, 187.000)
│ ├── waste_position_16 Trash (800.000, 217.500, 187.000)
│ ├── core_grippers HamiltonCoreGrippers (797.500, 085.500, 205.000)
(32) ├── trash Trash (800.000, 190.600, 137.100)
"""[1:]
),
)

def test_get_waste_positions_default_returns_16(self):
"""Default STARLetDeck has 16 addressable waste positions."""
deck = STARLetDeck()
positions = deck.get_waste_positions()
self.assertEqual(len(positions), 16)
self.assertEqual([p.name for p in positions], [f"waste_position_{i}" for i in range(1, 17)])

def test_get_waste_positions_none_returns_single_trash(self):
"""STARLetDeck(waste_positions=None) returns single trash (same as get_trash_area)."""
deck = STARLetDeck(waste_positions=None)
positions = deck.get_waste_positions()
self.assertEqual(len(positions), 1)
self.assertIs(positions[0], deck.get_trash_area())

def test_assign_gigantic_resource(self):
stanley_cup = StanleyCup_QUENCHER_FLOWSTATE_TUMBLER(name="HUGE")
deck = STARLetDeck()
Expand Down
Loading