diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 0c3495a0e89..9c1c75d6467 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1925,17 +1925,32 @@ async def drop_tips( # get highest z position max_z = max(op.resource.get_location_wrt(self.deck).z + op.offset.z for op in ops) if drop_method == TipDropMethod.PLACE_SHIFT: - # magic values empirically found in https://github.com/PyLabRobot/pylabrobot/pull/63 - begin_tip_deposit_process = ( - round((max_z + 59.9) * 10) - if begin_tip_deposit_process is None - else round(begin_tip_deposit_process * 10) - ) - end_tip_deposit_process = ( - round((max_z + 49.9) * 10) - if end_tip_deposit_process is None - else round(end_tip_deposit_process * 10) - ) + use_addressable_waste_z = all(isinstance(op.resource, Trash) for op in ops) and all( + getattr(op.resource, "category", None) == "waste_position" for op in ops + ) + if use_addressable_waste_z: + begin_tip_deposit_process = ( + round((max_z + 10) * 10) + if begin_tip_deposit_process is None + else round(begin_tip_deposit_process * 10) + ) + end_tip_deposit_process = ( + round(max_z * 10) + if end_tip_deposit_process is None + else round(end_tip_deposit_process * 10) + ) + else: + # magic values empirically found in https://github.com/PyLabRobot/pylabrobot/pull/63 + begin_tip_deposit_process = ( + round((max_z + 59.9) * 10) + if begin_tip_deposit_process is None + else round(begin_tip_deposit_process * 10) + ) + end_tip_deposit_process = ( + round((max_z + 49.9) * 10) + if end_tip_deposit_process is None + else round(end_tip_deposit_process * 10) + ) else: max_total_tip_length = max(op.tip.total_tip_length for op in ops) max_tip_length = max((op.tip.total_tip_length - op.tip.fitting_depth) for op in ops) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index 48d462ca000..52162fb7fb1 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -213,7 +213,7 @@ async def asyncSetUp(self): self.STAR.io.write = unittest.mock.MagicMock() self.STAR.io.read = unittest.mock.MagicMock() - self.deck = STARLetDeck() + self.deck = STARLetDeck(waste_positions=None) self.lh = LiquidHandler(self.STAR, deck=self.deck) self.tip_car = TIP_CAR_480_A00(name="tip carrier") @@ -1015,6 +1015,26 @@ async def test_discard_tips(self): ] ) + async def test_discard_tips_to_waste_positions(self): + """With addressable waste positions, discard_tips uses them and waste-spot z (tp1970tz1870).""" + deck = STARLetDeck() + tip_car = TIP_CAR_480_A00(name="tip carrier") + tip_car[1] = tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack_01") + deck.assign_child_resource(tip_car, rails=1) + lh = LiquidHandler(self.STAR, deck=deck) + await lh.setup() + + await lh.pick_up_tips(tip_rack["A1:H1"]) + self.STAR._write_and_read_command.reset_mock() + await lh.discard_tips() + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "C0TRid0003xp08000 08000 08000 08000 08000 08000 08000 08000yp4050 3800 3550 3300 3050 2800 2550 2300tm1 1 1 1 1 1 1 1tp1970tz1870th2450te2450ti0", + ) + ] + ) + async def test_portrait_tip_rack_handling(self): deck = STARLetDeck() lh = LiquidHandler(self.STAR, deck=deck) diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index 60eb2e8f11a..96b6f4e2513 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -31,6 +31,7 @@ ) from pylabrobot.liquid_handling.utils import ( get_tight_single_resource_liquid_op_offsets, + get_waste_positions_for_n_channels, get_wide_single_resource_liquid_op_offsets, ) from pylabrobot.machines.machine import Machine, need_setup_finished @@ -146,6 +147,7 @@ def __init__( super().assign_child_resource(deck, location=deck.location or Coordinate.zero()) self._resource_pickups: Dict[int, Optional[ResourcePickup]] = {} + self._channel_waste_positions: Optional[List[Trash]] = None # set in setup() @property def _resource_pickup(self) -> Optional[ResourcePickup]: @@ -183,6 +185,11 @@ async def setup(self, **backend_kwargs): self._resource_pickups = {a: None for a in range(self.backend.num_arms)} + positions = self.deck.get_waste_positions() + self._channel_waste_positions = get_waste_positions_for_n_channels( + positions, self.backend.num_channels + ) + def serialize_state(self) -> Dict[str, Any]: """Serialize the state of this liquid handler. Use :meth:`~Resource.serialize_all_states` to serialize the state of the liquid handler and all children (the deck).""" @@ -767,20 +774,30 @@ async def discard_tips( if n == 0: raise RuntimeError("No tips have been picked up and no channels were specified.") - trash = self.deck.get_trash_area() - trash_offsets = get_tight_single_resource_liquid_op_offsets( - trash, - num_channels=n, - ) - # add trash_offsets to offsets if defined, otherwise use trash_offsets - # too advanced for mypy - offsets = [ - o + to if o is not None else to - for o, to in zip(offsets or [None] * n, trash_offsets) # type: ignore - ] + if self._channel_waste_positions is None: + raise RuntimeError("Setup has not been run. Call LiquidHandler.setup() first.") + + waste_spots = [self._channel_waste_positions[c] for c in use_channels] + if all(s is waste_spots[0] for s in waste_spots): + trash = waste_spots[0] + trash_offsets = get_tight_single_resource_liquid_op_offsets( + trash, + num_channels=n, + ) + offsets = [ + o + to if o is not None else to + for o, to in zip(offsets or [None] * n, trash_offsets) # type: ignore + ] + return await self.drop_tips( + tip_spots=[trash] * n, + use_channels=use_channels, + offsets=offsets, + allow_nonzero_volume=allow_nonzero_volume, + **backend_kwargs, + ) return await self.drop_tips( - tip_spots=[trash] * n, + tip_spots=waste_spots, use_channels=use_channels, offsets=offsets, allow_nonzero_volume=allow_nonzero_volume, diff --git a/pylabrobot/liquid_handling/liquid_handler_tests.py b/pylabrobot/liquid_handling/liquid_handler_tests.py index d31cfed45d0..55d25eb4c42 100644 --- a/pylabrobot/liquid_handling/liquid_handler_tests.py +++ b/pylabrobot/liquid_handling/liquid_handler_tests.py @@ -14,7 +14,10 @@ Strictness, set_strictness, ) -from pylabrobot.liquid_handling.utils import get_tight_single_resource_liquid_op_offsets +from pylabrobot.liquid_handling.utils import ( + get_tight_single_resource_liquid_op_offsets, + get_waste_positions_for_n_channels, +) from pylabrobot.resources import ( PLT_CAR_L5AC_A00, TIP_CAR_480_A00, @@ -473,12 +476,43 @@ def test_serialize(self): ) +class TestGetWastePositionsForNChannels(unittest.TestCase): + """Tests for get_waste_positions_for_n_channels helper.""" + + def test_single_position_repeats_to_n(self): + deck = STARLetDeck(waste_positions=None) + positions = deck.get_waste_positions() + self.assertEqual(len(positions), 1) + result = get_waste_positions_for_n_channels(positions, 8) + self.assertEqual(len(result), 8) + self.assertTrue(all(r is positions[0] for r in result)) + + def test_sixteen_positions_subset_to_eight(self): + deck = STARLetDeck() + positions = deck.get_waste_positions() + self.assertEqual(len(positions), 16) + result = get_waste_positions_for_n_channels(positions, 8) + self.assertEqual(len(result), 8) + self.assertEqual( + [r.name for r in result], + [f"waste_position_{2 * i + 1}" for i in range(8)], + ) + + def test_n_greater_than_positions_raises(self): + deck = STARLetDeck() + positions = deck.get_waste_positions() + with self.assertRaises(ValueError) as ctx: + get_waste_positions_for_n_channels(positions, 20) + self.assertIn("Requested 20", str(ctx.exception)) + self.assertIn("only has 16", str(ctx.exception)) + + class TestLiquidHandlerCommands(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.maxDiff = None self.backend = _create_mock_backend(num_channels=8) - self.deck = STARLetDeck() + self.deck = STARLetDeck(waste_positions=None) self.lh = LiquidHandler(backend=self.backend, deck=self.deck) self.tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack") @@ -487,6 +521,51 @@ async def asyncSetUp(self): self.deck.assign_child_resource(self.plate, location=Coordinate(100, 100, 0)) await self.lh.setup() + async def test_channel_waste_positions_set_at_setup(self): + """After setup, _channel_waste_positions has length backend.num_channels.""" + positions = self.lh._channel_waste_positions + self.assertIsNotNone(positions) + assert positions is not None # for mypy + self.assertEqual(len(positions), 8) + # Single trash deck: all entries are the same trash + self.assertTrue(all(r is positions[0] for r in positions)) + + async def test_discard_tips_before_setup_raises(self): + """discard_tips before setup raises a clear error.""" + backend = _create_mock_backend(num_channels=8) + deck = STARLetDeck(waste_positions=None) + lh = LiquidHandler(backend=backend, deck=deck) + with self.assertRaises(RuntimeError) as ctx: + await lh.discard_tips(use_channels=[0]) + self.assertIn("Setup has not been run", str(ctx.exception)) + + async def test_discard_tips_uses_per_channel_waste_positions(self): + """With addressable waste, discard_tips sends each channel to its assigned waste position.""" + deck = STARLetDeck() # 16 addressable waste positions + backend = _create_mock_backend(num_channels=8) + lh = LiquidHandler(backend=backend, deck=deck) + tip_rack = hamilton_96_tiprack_300uL_filter(name="tip_rack") + deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0)) + await lh.setup() + + # Pick up on channels 0, 2, 5 only + await lh.pick_up_tips( + tip_rack["A1", "C1", "F1"], + use_channels=[0, 2, 5], + ) + backend.drop_tips.reset_mock() + await lh.discard_tips(use_channels=[0, 2, 5]) + + # Each channel should go to its pre-assigned waste position (from setup), not first 3 + call = backend.drop_tips.call_args + ops = call.kwargs["ops"] + use_channels = call.kwargs["use_channels"] + self.assertEqual(use_channels, [0, 2, 5]) + self.assertEqual(len(ops), 3) + # With 8 channels over 16 positions: channel 0 -> waste_position_1, 2 -> waste_position_5, 5 -> waste_position_11 + expected_names = ["waste_position_1", "waste_position_5", "waste_position_11"] + self.assertEqual([op.resource.name for op in ops], expected_names) + async def test_offsets_tips(self): tip_spot = self.tip_rack.get_item("A1") tip = tip_spot.get_tip() diff --git a/pylabrobot/liquid_handling/utils.py b/pylabrobot/liquid_handling/utils.py index 43997c75b9d..6a73971761c 100644 --- a/pylabrobot/liquid_handling/utils.py +++ b/pylabrobot/liquid_handling/utils.py @@ -2,6 +2,7 @@ from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.resource import Resource +from pylabrobot.resources.trash import Trash MIN_SPACING_BETWEEN_CHANNELS = 9.0 # minimum spacing between the edge of the container and the center of channel @@ -68,3 +69,14 @@ def get_tight_single_resource_liquid_op_offsets( ) for c in centers ] + + +def get_waste_positions_for_n_channels(positions: List[Trash], n: int) -> List[Trash]: + """Return n waste positions from the deck's list: repeat if single, else subset evenly.""" + if len(positions) == 1: + return [positions[0]] * n + if n > len(positions): + raise ValueError(f"Requested {n} waste positions but deck only has {len(positions)}.") + M = len(positions) + indices = [i * (M // n) for i in range(n)] + return [positions[j] for j in indices] diff --git a/pylabrobot/resources/deck.py b/pylabrobot/resources/deck.py index f2805bc7038..faa924feb45 100644 --- a/pylabrobot/resources/deck.py +++ b/pylabrobot/resources/deck.py @@ -132,3 +132,7 @@ def summary(self) -> str: def get_trash_area96(self) -> Trash: deck_class = self.__class__.__name__ raise NotImplementedError(f"This method is not implemented by deck '{deck_class}'") + + def get_waste_positions(self) -> List[Trash]: + """Return the list of waste positions on this deck. Default: single trash area.""" + return [cast(Trash, self.get_trash_area())] diff --git a/pylabrobot/resources/hamilton/hamilton_deck_tests.py b/pylabrobot/resources/hamilton/hamilton_deck_tests.py index 3d7998a8de1..f4e152a82f7 100644 --- a/pylabrobot/resources/hamilton/hamilton_deck_tests.py +++ b/pylabrobot/resources/hamilton/hamilton_deck_tests.py @@ -64,6 +64,22 @@ def test_summary(self): │ (31) ├── waste_block Resource (775.000, 115.000, 100.000) │ ├── teaching_tip_rack TipRack (780.900, 461.100, 100.000) + │ ├── waste_position_1 Trash (800.000, 405.000, 187.000) + │ ├── waste_position_2 Trash (800.000, 392.500, 187.000) + │ ├── waste_position_3 Trash (800.000, 380.000, 187.000) + │ ├── waste_position_4 Trash (800.000, 367.500, 187.000) + │ ├── waste_position_5 Trash (800.000, 355.000, 187.000) + │ ├── waste_position_6 Trash (800.000, 342.500, 187.000) + │ ├── waste_position_7 Trash (800.000, 330.000, 187.000) + │ ├── waste_position_8 Trash (800.000, 317.500, 187.000) + │ ├── waste_position_9 Trash (800.000, 305.000, 187.000) + │ ├── waste_position_10 Trash (800.000, 292.500, 187.000) + │ ├── waste_position_11 Trash (800.000, 280.000, 187.000) + │ ├── waste_position_12 Trash (800.000, 267.500, 187.000) + │ ├── waste_position_13 Trash (800.000, 255.000, 187.000) + │ ├── waste_position_14 Trash (800.000, 242.500, 187.000) + │ ├── waste_position_15 Trash (800.000, 230.000, 187.000) + │ ├── waste_position_16 Trash (800.000, 217.500, 187.000) │ ├── core_grippers HamiltonCoreGrippers (797.500, 085.500, 205.000) │ (32) ├── trash Trash (800.000, 190.600, 137.100) @@ -71,6 +87,20 @@ def test_summary(self): ), ) + def test_get_waste_positions_default_returns_16(self): + """Default STARLetDeck has 16 addressable waste positions.""" + deck = STARLetDeck() + positions = deck.get_waste_positions() + self.assertEqual(len(positions), 16) + self.assertEqual([p.name for p in positions], [f"waste_position_{i}" for i in range(1, 17)]) + + def test_get_waste_positions_none_returns_single_trash(self): + """STARLetDeck(waste_positions=None) returns single trash (same as get_trash_area).""" + deck = STARLetDeck(waste_positions=None) + positions = deck.get_waste_positions() + self.assertEqual(len(positions), 1) + self.assertIs(positions[0], deck.get_trash_area()) + def test_assign_gigantic_resource(self): stanley_cup = StanleyCup_QUENCHER_FLOWSTATE_TUMBLER(name="HUGE") deck = STARLetDeck() diff --git a/pylabrobot/resources/hamilton/hamilton_decks.py b/pylabrobot/resources/hamilton/hamilton_decks.py index 4f7acc2ec8b..548b97d73a9 100644 --- a/pylabrobot/resources/hamilton/hamilton_decks.py +++ b/pylabrobot/resources/hamilton/hamilton_decks.py @@ -2,7 +2,7 @@ import logging from abc import ABCMeta, abstractmethod -from typing import Literal, Optional, cast +from typing import List, Literal, Optional, cast from pylabrobot.resources.carrier import ResourceHolder from pylabrobot.resources.coordinate import Coordinate @@ -28,6 +28,11 @@ STAR_SIZE_Y = 653.5 STAR_SIZE_Z = 900 +# Default waste tip positions for STAR/STARlet (x=800, y 405→217.5 mm, z=187). +STAR_DEFAULT_WASTE_POSITIONS: List[Coordinate] = [ + Coordinate(x=800.0, y=405.0 - i * 12.5, z=187.0) for i in range(16) +] + def rails_for_x_coordinate(x: float) -> int: """Convert an x coordinate to a rail identifier.""" @@ -442,6 +447,7 @@ def __init__( core_grippers: Optional[ Literal["1000uL-at-waste", "1000uL-5mL-on-waste"] ] = "1000uL-5mL-on-waste", + waste_positions: Optional[List[Coordinate]] = None, ) -> None: """Create a new STAR(let) deck of the given size. @@ -516,11 +522,31 @@ def __init__( waste_block.assign_child_resource( teaching_tip_rack, location=Coordinate(x=5.9, y=346.1, z=0) ) + if waste_positions is not None: + wb_loc = waste_block.get_location_wrt(self) + for i, pos in enumerate(waste_positions, start=1): + pos_rel = Coordinate( + x=pos.x - wb_loc.x, + y=pos.y - wb_loc.y, + z=pos.z - wb_loc.z, + ) + waste_block.assign_child_resource( + Trash( + name=f"waste_position_{i}", + size_x=0.0, + size_y=0.0, + size_z=0.0, + category="waste_position", + ), + location=pos_rel, + ) else: if with_trash: raise RuntimeError("Trash area cannot be created when no waste block is present.") if with_teaching_rack: raise RuntimeError("Teaching rack cannot be created when no waste block is present.") + if waste_positions is not None: + raise RuntimeError("Waste positions cannot be created when no waste block is present.") if core_grippers == "1000uL-at-waste": # "at waste" x: float = 1338 if num_rails == STAR_NUM_RAILS else 798 @@ -554,6 +580,18 @@ def get_trash_area96(self) -> Trash: ) return self._trash96 + def get_waste_positions(self) -> List[Trash]: + """Return the list of waste positions: 16 addressable spots or single trash.""" + M = 0 + for i in range(1, 17): + if self.has_resource(f"waste_position_{i}"): + M += 1 + else: + break + if M == 0: + return [cast(Trash, self.get_trash_area())] + return [cast(Trash, self.get_resource(f"waste_position_{i}")) for i in range(1, M + 1)] + def clear(self, include_trash: bool = False): """Clear the deck, removing all resources except the trash areas and the waste block.""" children_names = [child.name for child in self.children] @@ -574,6 +612,7 @@ def STARLetDeck( core_grippers: Optional[ Literal["1000uL-at-waste", "1000uL-5mL-on-waste"] ] = "1000uL-5mL-on-waste", + waste_positions: Optional[List[Coordinate]] = STAR_DEFAULT_WASTE_POSITIONS, ) -> HamiltonSTARDeck: """Create a new STARLet deck. @@ -590,6 +629,7 @@ def STARLetDeck( with_trash96=with_trash96, with_teaching_rack=with_teaching_rack, core_grippers=core_grippers, + waste_positions=waste_positions, ) @@ -601,6 +641,9 @@ def STARDeck( core_grippers: Optional[ Literal["1000uL-at-waste", "1000uL-5mL-on-waste"] ] = "1000uL-5mL-on-waste", + waste_positions: Optional[ + List[Coordinate] + ] = None, # TODO: pretty sure this can be the same as Starlet, but have not confirmed with real hardware. ) -> HamiltonSTARDeck: """Create a new STAR deck. @@ -617,4 +660,5 @@ def STARDeck( with_trash96=with_trash96, with_teaching_rack=with_teaching_rack, core_grippers=core_grippers, + waste_positions=waste_positions, )