diff --git a/docs/_static/devices.json b/docs/_static/devices.json
index 05757bd8af1..f950490f658 100644
--- a/docs/_static/devices.json
+++ b/docs/_static/devices.json
@@ -654,7 +654,7 @@
"luminescence"
],
"status": "Mostly",
- "docs": "/user_guide/02_analytical/plate-reading/tecan-infinite.html",
+ "docs": "/user_guide/tecan/infinite/hello-world.html",
"oem": "https://lifesciences.tecan.com/infinite-200-pro"
},
{
diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst
index 72f1969330a..53a04635937 100644
--- a/docs/api/pylabrobot.rst
+++ b/docs/api/pylabrobot.rst
@@ -44,4 +44,5 @@ Manufacturers
pylabrobot.molecular_devices
pylabrobot.opentrons
pylabrobot.qinstruments
+ pylabrobot.tecan
pylabrobot.thermo_fisher
diff --git a/docs/api/pylabrobot.tecan.rst b/docs/api/pylabrobot.tecan.rst
new file mode 100644
index 00000000000..175fe540c77
--- /dev/null
+++ b/docs/api/pylabrobot.tecan.rst
@@ -0,0 +1,55 @@
+.. currentmodule:: pylabrobot.tecan
+
+pylabrobot.tecan package
+========================
+
+Infinite 200 PRO
+-----------------
+
+.. currentmodule:: pylabrobot.tecan.infinite.infinite
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ TecanInfinite200Pro
+
+.. currentmodule:: pylabrobot.tecan.infinite.driver
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ TecanInfiniteDriver
+
+.. currentmodule:: pylabrobot.tecan.infinite.absorbance_backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ TecanInfiniteAbsorbanceBackend
+ TecanInfiniteAbsorbanceParams
+
+.. currentmodule:: pylabrobot.tecan.infinite.fluorescence_backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ TecanInfiniteFluorescenceBackend
+ TecanInfiniteFluorescenceParams
+
+.. currentmodule:: pylabrobot.tecan.infinite.luminescence_backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ TecanInfiniteLuminescenceBackend
+ TecanInfiniteLuminescenceParams
diff --git a/docs/user_guide/machines.md b/docs/user_guide/machines.md
index 759f47694fe..dc057d79b15 100644
--- a/docs/user_guide/machines.md
+++ b/docs/user_guide/machines.md
@@ -167,7 +167,7 @@ tr > td:nth-child(5) { width: 15%; }
| Molecular Devices | SpectraMax M5e | absorbancefluorescence time-resolved fluorescencefluorescence polarization | Full | [OEM](https://www.moleculardevices.com/products/microplate-readers/multi-mode-readers/spectramax-m-series-readers) |
| Molecular Devices | SpectraMax 384plus | absorbance | Full | [OEM](https://www.moleculardevices.com/products/microplate-readers/absorbance-readers/spectramax-abs-plate-readers) |
| Molecular Devices | ImageXpress Pico | microscopy | Basics | [PLR](02_analytical/plate-reading/pico.ipynb) / [OEM](https://www.moleculardevices.com/products/cellular-imaging-systems/high-content-imaging/imagexpress-pico) |
-| Tecan | Infinite 200 PRO | absorbancefluorescenceluminescence | Mostly | [PLR](02_analytical/plate-reading/tecan-infinite.ipynb) / [OEM](https://lifesciences.tecan.com/infinite-200-pro) |
+| Tecan | Infinite 200 PRO | absorbancefluorescenceluminescence | Mostly | [PLR](tecan/infinite/hello-world.ipynb) / [OEM](https://lifesciences.tecan.com/infinite-200-pro) |
### Flow Cytometers
diff --git a/docs/user_guide/tecan/index.md b/docs/user_guide/tecan/index.md
new file mode 100644
index 00000000000..5911f1bca86
--- /dev/null
+++ b/docs/user_guide/tecan/index.md
@@ -0,0 +1,7 @@
+# Tecan
+
+```{toctree}
+:maxdepth: 1
+
+infinite/hello-world
+```
diff --git a/docs/user_guide/tecan/infinite/hello-world.ipynb b/docs/user_guide/tecan/infinite/hello-world.ipynb
new file mode 100644
index 00000000000..d91ff1af75b
--- /dev/null
+++ b/docs/user_guide/tecan/infinite/hello-world.ipynb
@@ -0,0 +1,132 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "# Tecan Infinite 200 PRO\n\nThe Tecan Infinite 200 PRO is a multimode microplate reader that supports:\n\n- [Absorbance](../../capabilities/absorbance) (230--1000 nm)\n- [Fluorescence](../../capabilities/fluorescence) (230--850 nm excitation/emission)\n- [Luminescence](../../capabilities/luminescence)\n\nThis backend targets the Infinite \"M\" series (e.g., Infinite 200 PRO M Plex). The \"F\" series uses a different optical path and is not covered here."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "from pylabrobot.tecan.infinite import TecanInfinite200Pro\n\nreader = TecanInfinite200Pro(name=\"reader\")\nawait reader.setup()"
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "await reader.loading_tray.open()"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "Before closing, assign a plate to the reader. This determines the well positions for measurements."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "from pylabrobot.resources import Cor_96_wellplate_360ul_Fb\nplate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\nreader.loading_tray.assign_child_resource(plate)"
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "await reader.loading_tray.close()"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "## Absorbance\n\nRead absorbance at a specified wavelength (230--1000 nm). For the full API, see [Absorbance](../../capabilities/absorbance)."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "results = await reader.absorbance.read(plate=plate, wavelength=450)\nresults[0].data # 2D array indexed [row][col]"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "### Backend-specific parameters\n\nUse {class}`~pylabrobot.tecan.infinite.TecanInfiniteAbsorbanceParams` to configure flashes and bandwidth."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "from pylabrobot.tecan.infinite import TecanInfiniteAbsorbanceParams\n\nresults = await reader.absorbance.read(\n plate=plate,\n wavelength=450,\n backend_params=TecanInfiniteAbsorbanceParams(flashes=50, bandwidth=9.0),\n)"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "## Fluorescence\n\nRead fluorescence with specified excitation and emission wavelengths (230--850 nm). The focal height is in millimeters. For the full API, see [Fluorescence](../../capabilities/fluorescence)."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "results = await reader.fluorescence.read(\n plate=plate,\n excitation_wavelength=485,\n emission_wavelength=528,\n focal_height=20.0,\n)\nresults[0].data"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "## Luminescence\n\nRead luminescence. The focal height is in millimeters. For the full API, see [Luminescence](../../capabilities/luminescence)."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "results = await reader.luminescence.read(plate=plate, focal_height=20.0)\nresults[0].data"
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "## Reading specific wells\n\nYou can specify a subset of wells to read instead of the entire plate."
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": "wells = plate.get_items([\"A1\", \"A2\", \"B1\", \"B2\"])\nresults = await reader.absorbance.read(plate=plate, wavelength=450, wells=wells)"
+ },
+ {
+ "cell_type": "markdown",
+ "source": "## Teardown",
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "source": "await reader.stop()",
+ "metadata": {},
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": "## Installation\n\nThe Infinite 200 PRO connects via USB. PyLabRobot uses `pyusb` for communication, which requires `libusb` on your system.\n\n### macOS\n\n```bash\nbrew install libusb\n```\n\n### Linux (Debian/Ubuntu)\n\n```bash\nsudo apt-get install libusb-1.0-0-dev\n```\n\n### Windows\n\nInstall [Zadig](https://zadig.akeo.ie/) and replace the Infinite's default USB driver with `WinUSB` or `libusb-win32`.",
+ "metadata": {}
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "name": "python",
+ "version": "3.10.0"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}
diff --git a/pylabrobot/legacy/plate_reading/tecan/infinite_backend.py b/pylabrobot/legacy/plate_reading/tecan/infinite_backend.py
index 06c6e08ca5c..496c29401ad 100644
--- a/pylabrobot/legacy/plate_reading/tecan/infinite_backend.py
+++ b/pylabrobot/legacy/plate_reading/tecan/infinite_backend.py
@@ -1,504 +1,84 @@
"""Tecan Infinite 200 PRO backend.
-This backend targets the Infinite "M" series (e.g., Infinite 200 PRO). The
-"F" series uses a different optical path and is not covered here.
+Legacy wrapper. Use :class:`pylabrobot.tecan.infinite.TecanInfinite200Pro` instead.
+
+This module delegates to the new Device/Driver/CapabilityBackend architecture
+while preserving the legacy ``PlateReaderBackend`` API and all internal symbols
+imported by existing tests.
"""
from __future__ import annotations
-import asyncio
import logging
-import math
-import re
-import time
-from abc import ABC, abstractmethod
-from dataclasses import dataclass
-from typing import Dict, List, Optional, Sequence, Tuple
+from typing import Dict, List, Optional
-from pylabrobot.io.binary import Reader
-from pylabrobot.io.usb import USB
+from pylabrobot.io.usb import USB # noqa: F401 — test patches this import location
from pylabrobot.legacy.plate_reading.backend import PlateReaderBackend
from pylabrobot.resources import Plate
from pylabrobot.resources.well import Well
+from pylabrobot.tecan.infinite.absorbance_backend import (
+ TecanInfiniteAbsorbanceBackend,
+ TecanInfiniteAbsorbanceParams,
+)
+from pylabrobot.tecan.infinite.driver import TecanInfiniteDriver
+from pylabrobot.tecan.infinite.fluorescence_backend import (
+ TecanInfiniteFluorescenceBackend,
+ TecanInfiniteFluorescenceParams,
+)
+from pylabrobot.tecan.infinite.luminescence_backend import (
+ TecanInfiniteLuminescenceBackend,
+ TecanInfiniteLuminescenceParams,
+)
+
+# Re-export protocol symbols so existing test imports continue to work.
+from pylabrobot.tecan.infinite.protocol import ( # noqa: F401
+ BIN_RE,
+ StagePosition,
+ _AbsorbanceCalibration,
+ _AbsorbanceCalibrationItem,
+ _AbsorbanceMeasurement,
+ _AbsorbanceRunDecoder,
+ _FluorescenceCalibration,
+ _FluorescenceRunDecoder,
+ _LuminescenceCalibration,
+ _LuminescenceMeasurement,
+ _LuminescenceRunDecoder,
+ _MeasurementDecoder,
+ _StreamEvent,
+ _StreamParser,
+ _absorbance_od_calibrated,
+ _consume_leading_ascii_frame,
+ _consume_status_frame,
+ _decode_abs_calibration,
+ _decode_abs_data,
+ _decode_flr_calibration,
+ _decode_flr_data,
+ _decode_lum_calibration,
+ _decode_lum_data,
+ _fluorescence_corrected,
+ _integration_microseconds_to_seconds,
+ _is_abs_calibration_len,
+ _is_abs_data_len,
+ _luminescence_intensity,
+ _split_payload_and_trailer,
+ format_plate_result,
+ frame_command,
+ is_terminal_frame,
+)
logger = logging.getLogger(__name__)
-BIN_RE = re.compile(r"^(\d+),BIN:$")
-
-
-def _integration_microseconds_to_seconds(value: int) -> float:
- # DLL/UI indicates integration time is stored in microseconds; UI displays ms by dividing by 1000.
- return value / 1_000_000.0
-
-
-def _is_abs_calibration_len(payload_len: int) -> bool:
- return payload_len >= 22 and (payload_len - 4) % 18 == 0
-
-
-def _is_abs_data_len(payload_len: int) -> bool:
- return payload_len >= 14 and (payload_len - 4) % 10 == 0
-
-
-def _split_payload_and_trailer(
- payload_len: int, blob: bytes
-) -> Optional[Tuple[bytes, Tuple[int, int]]]:
- if len(blob) != payload_len + 4:
- return None
- payload = blob[:payload_len]
- trailer_reader = Reader(blob[payload_len:], little_endian=False)
- return payload, (trailer_reader.u16(), trailer_reader.u16())
-
-
-@dataclass(frozen=True)
-class _AbsorbanceCalibrationItem:
- ticker_overflows: int
- ticker_counter: int
- meas_gain: int
- meas_dark: int
- meas_bright: int
- ref_gain: int
- ref_dark: int
- ref_bright: int
-
-
-@dataclass(frozen=True)
-class _AbsorbanceCalibration:
- ex: int
- items: List[_AbsorbanceCalibrationItem]
-
-
-def _decode_abs_calibration(payload_len: int, blob: bytes) -> Optional[_AbsorbanceCalibration]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) < 4 + 18:
- return None
- if (len(payload) - 4) % 18 != 0:
- return None
- reader = Reader(payload, little_endian=False)
- reader.raw_bytes(2) # skip first 2 bytes
- ex = reader.u16()
- items: List[_AbsorbanceCalibrationItem] = []
- while reader.has_remaining():
- items.append(
- _AbsorbanceCalibrationItem(
- ticker_overflows=reader.u32(),
- ticker_counter=reader.u16(),
- meas_gain=reader.u16(),
- meas_dark=reader.u16(),
- meas_bright=reader.u16(),
- ref_gain=reader.u16(),
- ref_dark=reader.u16(),
- ref_bright=reader.u16(),
- )
- )
- return _AbsorbanceCalibration(ex=ex, items=items)
-
-
-def _decode_abs_data(
- payload_len: int, blob: bytes
-) -> Optional[Tuple[int, int, List[Tuple[int, int]]]]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) < 4:
- return None
- reader = Reader(payload, little_endian=False)
- label = reader.u16()
- ex = reader.u16()
- items: List[Tuple[int, int]] = []
- while reader.offset() + 10 <= len(payload):
- reader.raw_bytes(6) # skip first 6 bytes of each item
- meas = reader.u16()
- ref = reader.u16()
- items.append((meas, ref))
- if reader.offset() != len(payload):
- return None
- return label, ex, items
-
-
-def _absorbance_od_calibrated(
- cal: _AbsorbanceCalibration, meas_ref_items: List[Tuple[int, int]], od_max: float = 4.0
-) -> float:
- if not cal.items:
- raise ValueError("ABS calibration packet contained no calibration items.")
-
- min_corr_trans = math.pow(10.0, -od_max)
-
- if len(cal.items) == len(meas_ref_items) and len(cal.items) > 1:
- corr_trans_vals: List[float] = []
- for (meas, ref), cal_item in zip(meas_ref_items, cal.items):
- denom_corr = cal_item.meas_bright - cal_item.meas_dark
- if denom_corr == 0:
- continue
- f_corr = (cal_item.ref_bright - cal_item.ref_dark) / denom_corr
- denom = ref - cal_item.ref_dark
- if denom == 0:
- continue
- corr_trans_vals.append(((meas - cal_item.meas_dark) / denom) * f_corr)
- if not corr_trans_vals:
- raise ZeroDivisionError("ABS invalid: no usable reads after per-read calibration.")
- corr_trans = max(sum(corr_trans_vals) / len(corr_trans_vals), min_corr_trans)
- return float(-math.log10(corr_trans))
-
- cal0 = cal.items[0]
- denom_corr = cal0.meas_bright - cal0.meas_dark
- if denom_corr == 0:
- raise ZeroDivisionError("ABS calibration invalid: meas_bright == meas_dark")
- f_corr = (cal0.ref_bright - cal0.ref_dark) / denom_corr
-
- trans_vals: List[float] = []
- for meas, ref in meas_ref_items:
- denom = ref - cal0.ref_dark
- if denom == 0:
- continue
- trans_vals.append((meas - cal0.meas_dark) / denom)
- if not trans_vals:
- raise ZeroDivisionError("ABS invalid: all ref reads equal ref_dark")
-
- trans_mean = sum(trans_vals) / len(trans_vals)
- corr_trans = max(trans_mean * f_corr, min_corr_trans)
- return float(-math.log10(corr_trans))
-
-
-@dataclass(frozen=True)
-class _FluorescenceCalibration:
- ex: int
- meas_dark: int
- ref_dark: int
- ref_bright: int
-
-
-def _decode_flr_calibration(payload_len: int, blob: bytes) -> Optional[_FluorescenceCalibration]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) != 18:
- return None
- reader = Reader(payload, little_endian=False)
- ex = reader.u16()
- reader.raw_bytes(8) # skip bytes 2-9
- meas_dark = reader.u16()
- reader.raw_bytes(2) # skip bytes 12-13
- ref_dark = reader.u16()
- ref_bright = reader.u16()
- return _FluorescenceCalibration(
- ex=ex,
- meas_dark=meas_dark,
- ref_dark=ref_dark,
- ref_bright=ref_bright,
- )
-
-
-def _decode_flr_data(
- payload_len: int, blob: bytes
-) -> Optional[Tuple[int, int, int, List[Tuple[int, int]]]]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) < 6:
- return None
- reader = Reader(payload, little_endian=False)
- label = reader.u16()
- ex = reader.u16()
- em = reader.u16()
- items: List[Tuple[int, int]] = []
- while reader.offset() + 10 <= len(payload):
- reader.raw_bytes(6) # skip first 6 bytes of each item
- meas = reader.u16()
- ref = reader.u16()
- items.append((meas, ref))
- if reader.offset() != len(payload):
- return None
- return label, ex, em, items
-
-
-def _fluorescence_corrected(
- cal: _FluorescenceCalibration, meas_ref_items: List[Tuple[int, int]]
-) -> int:
- if not meas_ref_items:
- return 0
- meas_mean = sum(m for m, _ in meas_ref_items) / len(meas_ref_items)
- ref_mean = sum(r for _, r in meas_ref_items) / len(meas_ref_items)
- denom = ref_mean - cal.ref_dark
- if denom == 0:
- return 0
- corr = (meas_mean - cal.meas_dark) * (cal.ref_bright - cal.ref_dark) / denom
- return int(round(corr))
-
-
-@dataclass(frozen=True)
-class _LuminescenceCalibration:
- ref_dark: int
-
-
-def _decode_lum_calibration(payload_len: int, blob: bytes) -> Optional[_LuminescenceCalibration]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) != 10:
- return None
- reader = Reader(payload, little_endian=False)
- reader.raw_bytes(6) # skip bytes 0-5
- return _LuminescenceCalibration(ref_dark=reader.i32())
-
-
-def _decode_lum_data(payload_len: int, blob: bytes) -> Optional[Tuple[int, int, List[int]]]:
- split = _split_payload_and_trailer(payload_len, blob)
- if split is None:
- return None
- payload, _ = split
- if len(payload) < 4:
- return None
- reader = Reader(payload, little_endian=False)
- label = reader.u16()
- em = reader.u16()
- counts: List[int] = []
- while reader.offset() + 10 <= len(payload):
- reader.raw_bytes(6) # skip first 6 bytes of each item
- counts.append(reader.i32())
- if reader.offset() != len(payload):
- return None
- return label, em, counts
-
-
-def _luminescence_intensity(
- cal: _LuminescenceCalibration,
- counts: List[int],
- dark_integration_s: float,
- meas_integration_s: float,
-) -> int:
- if not counts:
- return 0
- if dark_integration_s == 0 or meas_integration_s == 0:
- return 0
- count_mean = sum(counts) / len(counts)
- corrected_rate = (count_mean / meas_integration_s) - (cal.ref_dark / dark_integration_s)
- return int(corrected_rate)
-
-
-StagePosition = Tuple[int, int]
-
-
-def _consume_leading_ascii_frame(buffer: bytearray) -> Tuple[bool, Optional[str]]:
- """Remove a leading STX...ETX ASCII frame if present."""
-
- if not buffer or buffer[0] != 0x02:
- return False, None
- end = buffer.find(b"\x03", 1)
- if end == -1:
- return False, None
- # Payload is followed by a 4-byte trailer and optional CR.
- if len(buffer) < end + 5:
- return False, None
- text = buffer[1:end].decode("ascii", "ignore")
- del buffer[: end + 5]
- if buffer and buffer[0] == 0x0D:
- del buffer[0]
- return True, text
-
-
-def _consume_status_frame(buffer: bytearray, length: int) -> bool:
- """Drop a leading ESC-prefixed status frame if present."""
-
- if len(buffer) >= length and buffer[0] == 0x1B:
- del buffer[:length]
- return True
- return False
-
-
-@dataclass
-class _StreamEvent:
- """Parsed stream event (ASCII or binary)."""
-
- text: Optional[str] = None
- payload_len: Optional[int] = None
- blob: Optional[bytes] = None
-
-
-class _StreamParser:
- """Parse mixed ASCII and binary packets from the reader."""
-
- def __init__(
- self,
- *,
- status_frame_len: Optional[int] = None,
- allow_bare_ascii: bool = False,
- ) -> None:
- """Initialize the stream parser."""
- self._buffer = bytearray()
- self._pending_bin: Optional[int] = None
- self._status_frame_len = status_frame_len
- self._allow_bare_ascii = allow_bare_ascii
-
- def has_pending_bin(self) -> bool:
- """Return True if a binary payload length is pending."""
- return self._pending_bin is not None
-
- def feed(self, chunk: bytes) -> List[_StreamEvent]:
- """Feed raw bytes and return newly parsed events."""
- self._buffer.extend(chunk)
- events: List[_StreamEvent] = []
- progressed = True
- while progressed:
- progressed = False
- if self._pending_bin is not None:
- need = self._pending_bin + 4
- if len(self._buffer) < need:
- break
- blob = bytes(self._buffer[:need])
- del self._buffer[:need]
- events.append(_StreamEvent(payload_len=self._pending_bin, blob=blob))
- self._pending_bin = None
- progressed = True
- continue
- if self._status_frame_len and _consume_status_frame(self._buffer, self._status_frame_len):
- progressed = True
- continue
- consumed, text = _consume_leading_ascii_frame(self._buffer)
- if consumed:
- events.append(_StreamEvent(text=text))
- if text:
- m = BIN_RE.match(text)
- if m:
- self._pending_bin = int(m.group(1))
- progressed = True
- continue
- if self._allow_bare_ascii and self._buffer and all(32 <= b <= 126 for b in self._buffer):
- text = self._buffer.decode("ascii", "ignore")
- self._buffer.clear()
- events.append(_StreamEvent(text=text))
- progressed = True
- continue
- return events
-
-
-class _MeasurementDecoder(ABC):
- """Shared incremental decoder for Infinite measurement streams."""
-
- STATUS_FRAME_LEN: Optional[int] = None
-
- def __init__(self, expected: int) -> None:
- """Initialize decoder state for a scan with expected measurements."""
- self.expected = expected
- self._terminal_seen = False
- self._parser = _StreamParser(status_frame_len=self.STATUS_FRAME_LEN)
-
- @property
- @abstractmethod
- def count(self) -> int:
- """Return number of decoded measurements so far."""
-
- @property
- def done(self) -> bool:
- """Return True if the decoder has seen all expected measurements."""
- return self.count >= self.expected
-
- def pop_terminal(self) -> bool:
- """Return and clear the terminal frame seen flag."""
- seen = self._terminal_seen
- self._terminal_seen = False
- return seen
-
- def feed(self, chunk: bytes) -> None:
- """Consume a raw chunk and update decoder state."""
- for event in self._parser.feed(chunk):
- if event.text is not None:
- if event.text == "ST":
- self._terminal_seen = True
- elif event.payload_len is not None and event.blob is not None:
- self.feed_bin(event.payload_len, event.blob)
-
- def feed_bin(self, payload_len: int, blob: bytes) -> None:
- """Handle a binary payload if the decoder expects one."""
- if self._should_consume_bin(payload_len):
- self._handle_bin(payload_len, blob)
-
- def _should_consume_bin(self, _payload_len: int) -> bool:
- return False
-
- def _handle_bin(self, _payload_len: int, _blob: bytes) -> None:
- return None
class ExperimentalTecanInfinite200ProBackend(PlateReaderBackend):
- """Backend shell for the Infinite 200 PRO."""
+ """Legacy wrapper around the new Tecan Infinite architecture.
+
+ Use :class:`pylabrobot.tecan.infinite.TecanInfinite200Pro` for new code.
+ """
- _MODE_CAPABILITY_COMMANDS: Dict[str, List[str]] = {
- "ABS": [
- "#BEAM DIAMETER",
- # Additional capabilities available but currently unused:
- # "#EXCITATION WAVELENGTH",
- # "#EXCITATION USAGE",
- # "#EXCITATION NAME",
- # "#EXCITATION BANDWIDTH",
- # "#EXCITATION ATTENUATION",
- # "#EXCITATION DESCRIPTION",
- # "#TIME READDELAY",
- # "#SHAKING MODE",
- # "#SHAKING CONST.ORBITAL",
- # "#SHAKING AMPLITUDE",
- # "#SHAKING TIME",
- # "#SHAKING CONST.LINEAR",
- # "#TEMPERATURE PLATE",
- ],
- "FI.TOP": [
- # "#BEAM DIAMETER",
- # Additional capabilities available but currently unused:
- # "#EMISSION WAVELENGTH",
- # "#EMISSION USAGE",
- # "#EMISSION NAME",
- # "#EMISSION BANDWIDTH",
- # "#EMISSION ATTENUATION",
- # "#EMISSION DESCRIPTION",
- # "#EXCITATION WAVELENGTH",
- # "#EXCITATION USAGE",
- # "#EXCITATION NAME",
- # "#EXCITATION BANDWIDTH",
- # "#EXCITATION ATTENUATION",
- # "#EXCITATION DESCRIPTION",
- # "#TIME INTEGRATION",
- # "#TIME LAG",
- # "#TIME READDELAY",
- # "#GAIN VALUE",
- # "#READS SPEED",
- # "#READS NUMBER",
- # "#RANGES PMT,EXCITATION",
- # "#RANGES PMT,EMISSION",
- # "#POSITION FIL,Z",
- # "#TEMPERATURE PLATE",
- ],
- "FI.BOTTOM": [
- # "#BEAM DIAMETER",
- # Additional capabilities available but currently unused:
- # "#EMISSION WAVELENGTH",
- # "#EMISSION USAGE",
- # "#EXCITATION WAVELENGTH",
- # "#EXCITATION USAGE",
- # "#TIME INTEGRATION",
- # "#TIME LAG",
- # "#TIME READDELAY",
- ],
- "LUM": [
- # "#BEAM DIAMETER",
- # Additional capabilities available but currently unused:
- # "#EMISSION WAVELENGTH",
- # "#EMISSION USAGE",
- # "#EMISSION NAME",
- # "#EMISSION BANDWIDTH",
- # "#EMISSION ATTENUATION",
- # "#EMISSION DESCRIPTION",
- # "#TIME INTEGRATION",
- # "#TIME READDELAY",
- ],
- }
+ VENDOR_ID = TecanInfiniteDriver.VENDOR_ID
+ PRODUCT_ID = TecanInfiniteDriver.PRODUCT_ID
- VENDOR_ID = 0x0C47
- PRODUCT_ID = 0x8007
+ _MODE_CAPABILITY_COMMANDS = TecanInfiniteDriver._MODE_CAPABILITY_COMMANDS
def __init__(
self,
@@ -507,210 +87,113 @@ def __init__(
counts_per_mm_z: float = 1_000,
) -> None:
super().__init__()
- self.io = USB(
+ # Create USB here so that test patches on
+ # "pylabrobot.legacy.plate_reading.tecan.infinite_backend.USB"
+ # are picked up. Pass the io instance to the driver.
+ io = USB(
id_vendor=self.VENDOR_ID,
id_product=self.PRODUCT_ID,
human_readable_device_name="Tecan Infinite 200 PRO",
packet_read_timeout=3,
read_timeout=30,
)
+ self._driver = TecanInfiniteDriver(
+ counts_per_mm_x=counts_per_mm_x,
+ counts_per_mm_y=counts_per_mm_y,
+ counts_per_mm_z=counts_per_mm_z,
+ io=io,
+ )
+ self._absorbance = TecanInfiniteAbsorbanceBackend(self._driver)
+ self._fluorescence = TecanInfiniteFluorescenceBackend(self._driver)
+ self._luminescence = TecanInfiniteLuminescenceBackend(self._driver)
+
+ # Alias for direct attribute access (legacy code)
+ self.io = io
self.counts_per_mm_x = counts_per_mm_x
self.counts_per_mm_y = counts_per_mm_y
self.counts_per_mm_z = counts_per_mm_z
- self._setup_lock = asyncio.Lock()
- self._ready = False
- self._read_chunk_size = 512
- self._max_row_wait_s = 300.0
- self._mode_capabilities: Dict[str, Dict[str, str]] = {}
- self._pending_bin_events: List[Tuple[int, bytes]] = []
- self._parser = _StreamParser(allow_bare_ascii=True)
- self._run_active = False
- self._active_step_loss_commands: List[str] = []
- async def setup(self) -> None:
- async with self._setup_lock:
- if self._ready:
- return
- await self.io.setup()
- await self._initialize_device()
- for mode in self._MODE_CAPABILITY_COMMANDS:
- if mode not in self._mode_capabilities:
- await self._query_mode_capabilities(mode)
- self._ready = True
+ # -- state proxies for test compat --
- async def stop(self) -> None:
- async with self._setup_lock:
- if not self._ready:
- return
- await self._cleanup_protocol()
- await self.io.stop()
- self._mode_capabilities.clear()
- self._reset_stream_state()
- self._ready = False
+ @property
+ def _ready(self):
+ return self._driver._ready
- async def open(self) -> None:
- """Open the reader drawer."""
+ @_ready.setter
+ def _ready(self, value):
+ self._driver._ready = value
- await self._send_command("ABSOLUTE MTP,OUT")
- await self._send_command("BY#T5000")
+ @property
+ def _pending_bin_events(self):
+ return self._driver._pending_bin_events
- async def close(self, plate: Optional[Plate]) -> None: # noqa: ARG002
- """Close the reader drawer."""
+ @_pending_bin_events.setter
+ def _pending_bin_events(self, value):
+ self._driver._pending_bin_events = value
- await self._send_command("ABSOLUTE MTP,IN")
- await self._send_command("BY#T5000")
+ @property
+ def _mode_capabilities(self):
+ return self._driver._mode_capabilities
- async def _run_scan(
- self,
- ordered_wells: Sequence[Well],
- decoder: _MeasurementDecoder,
- mode: str,
- step_loss_commands: List[str],
- serpentine: bool,
- scan_direction: str,
- ) -> None:
- """Run the common scan loop for all measurement types.
+ @property
+ def _parser(self):
+ return self._driver._parser
- Args:
- ordered_wells: The wells to scan in row-major order.
- decoder: The decoder to use for parsing measurements.
- mode: The mode name for logging (e.g., "Absorbance").
- step_loss_commands: Commands to run after the scan to check for step loss.
- serpentine: Whether to use serpentine scan order.
- scan_direction: The scan direction command (e.g., "ALTUP", "UP").
- """
- self._active_step_loss_commands = step_loss_commands
+ @property
+ def _run_active(self):
+ return self._driver._run_active
- for row_index, row_wells in self._group_by_row(ordered_wells):
- start_x, end_x, count = self._scan_range(row_index, row_wells, serpentine=serpentine)
- _, y_stage = self._map_well_to_stage(row_wells[0])
+ @property
+ def _active_step_loss_commands(self):
+ return self._driver._active_step_loss_commands
- await self._send_command(f"ABSOLUTE MTP,Y={y_stage}")
- # Match the OEM one-row scan flow by explicitly pre-positioning the transport to the
- # row start before issuing SCANX. Hardware testing showed the standalone XY move alone
- # can reintroduce the first-row edge-read problem.
- await self._send_command(f"ABSOLUTE MTP,X={start_x},Y={y_stage}")
- await self._send_command(f"SCAN DIRECTION={scan_direction}")
- await self._send_command(
- f"SCANX {start_x},{end_x},{count}", wait_for_terminal=False, read_response=False
- )
- logger.info(
- "Queued %s scan row %s (%s wells): y=%s, x=%s..%s",
- mode.lower(),
- row_index,
- count,
- y_stage,
- start_x,
- end_x,
- )
- await self._await_measurements(decoder, count, mode)
- await self._await_scan_terminal(decoder.pop_terminal())
+ @property
+ def _read_chunk_size(self):
+ return self._driver._read_chunk_size
- async def read_absorbance(
- self,
- plate: Plate,
- wells: List[Well],
- wavelength: int,
- flashes: int = 25,
- bandwidth: Optional[float] = None,
- ) -> List[Dict]:
- """Queue and execute an absorbance scan.
+ @property
+ def _max_row_wait_s(self):
+ return self._driver._max_row_wait_s
+
+ # -- lifecycle --
- Args:
- bandwidth: Excitation bandwidth in nm. If None, auto-selected (9 nm for >315 nm, 5 nm
- otherwise).
- """
+ async def setup(self) -> None:
+ await self._driver.setup()
- if not 230 <= wavelength <= 1_000:
- raise ValueError("Absorbance wavelength must be between 230 nm and 1000 nm.")
+ async def stop(self) -> None:
+ await self._driver.stop()
- ordered_wells = wells if wells else plate.get_all_items()
- scan_wells = self._scan_visit_order(ordered_wells, serpentine=True)
- decoder = _AbsorbanceRunDecoder(len(scan_wells))
+ # -- tray --
- await self._begin_run()
- try:
- await self._configure_absorbance(wavelength, flashes=flashes, bandwidth=bandwidth)
- await self._run_scan(
- ordered_wells=ordered_wells,
- decoder=decoder,
- mode="Absorbance",
- step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK ABS.STEPLOSS"],
- serpentine=True,
- scan_direction="ALTUP",
- )
+ async def open(self) -> None:
+ await self._driver.open_tray()
- self._drain_pending_bin_events(decoder)
- if len(decoder.measurements) != len(scan_wells):
- raise RuntimeError("Absorbance decoder did not complete scan.")
- intensities: List[float] = []
- cal = decoder.calibration
- if cal is None:
- raise RuntimeError("ABS calibration packet not seen; cannot compute calibrated OD.")
- for meas in decoder.measurements:
- items = meas.items or [(meas.sample, meas.reference)]
- od = _absorbance_od_calibrated(cal, items)
- intensities.append(od)
- matrix = self._format_plate_result(plate, scan_wells, intensities)
- return [
- {
- "wavelength": wavelength,
- "time": time.time(),
- "temperature": None,
- "data": matrix,
- }
- ]
- finally:
- await self._end_run()
+ async def close(self, plate: Optional[Plate] = None) -> None: # noqa: ARG002
+ await self._driver.close_tray()
- async def _clear_mode_settings(self, excitation: bool = False, emission: bool = False) -> None:
- """Clear mode settings before configuring a new scan."""
- if excitation:
- await self._send_command("EXCITATION CLEAR", allow_timeout=True)
- if emission:
- await self._send_command("EMISSION CLEAR", allow_timeout=True)
- await self._send_command("TIME CLEAR", allow_timeout=True)
- await self._send_command("GAIN CLEAR", allow_timeout=True)
- await self._send_command("READS CLEAR", allow_timeout=True)
- await self._send_command("POSITION CLEAR", allow_timeout=True)
- await self._send_command("MIRROR CLEAR", allow_timeout=True)
+ # -- reads: delegate to backends, convert Result -> dict --
- async def _configure_absorbance(
+ async def read_absorbance(
self,
- wavelength_nm: int,
- *,
- flashes: int,
+ plate: Plate,
+ wells: List[Well],
+ wavelength: int,
+ flashes: int = 25,
bandwidth: Optional[float] = None,
- ) -> None:
- wl_decitenth = int(round(wavelength_nm * 10))
- bw = bandwidth if bandwidth is not None else self._auto_bandwidth(wavelength_nm)
- bw_decitenth = int(round(bw * 10))
- reads_number = max(1, flashes)
-
- await self._send_command("MODE ABS")
- await self._clear_mode_settings(excitation=True)
- await self._send_command(
- f"EXCITATION 0,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True
- )
- await self._send_command(
- f"EXCITATION 1,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True
- )
- await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command("TIME 0,READDELAY=0", allow_timeout=True)
- await self._send_command("TIME 1,READDELAY=0", allow_timeout=True)
- await self._send_command("SCAN DIRECTION=ALTUP", allow_timeout=True)
- await self._send_command("#RATIO LABELS", allow_timeout=True)
- await self._send_command(
- f"BEAM DIAMETER={self._capability_numeric('ABS', '#BEAM DIAMETER', 700)}", allow_timeout=True
+ ) -> List[Dict]:
+ params = TecanInfiniteAbsorbanceParams(flashes=flashes, bandwidth=bandwidth)
+ results = await self._absorbance.read_absorbance(
+ plate=plate, wells=wells, wavelength=wavelength, backend_params=params,
)
- await self._send_command("RATIO LABELS=1", allow_timeout=True)
- await self._send_command("PREPARE REF", allow_timeout=True, read_response=False)
-
- def _auto_bandwidth(self, wavelength_nm: int) -> float:
- """Return bandwidth in nm based on Infinite M specification."""
-
- return 9.0 if wavelength_nm > 315 else 5.0
+ return [
+ {
+ "wavelength": r.wavelength,
+ "time": r.timestamp,
+ "temperature": r.temperature,
+ "data": r.data,
+ }
+ for r in results
+ ]
async def read_fluorescence(
self,
@@ -726,120 +209,32 @@ async def read_fluorescence(
emission_bandwidth: int = 200,
lag_us: int = 0,
) -> List[Dict]:
- """Queue and execute a fluorescence scan.
-
- Args:
- gain: PMT gain value (0-255).
- excitation_bandwidth: Excitation filter bandwidth in deci-tenths of nm.
- emission_bandwidth: Emission filter bandwidth in deci-tenths of nm.
- lag_us: Lag time in microseconds between excitation and measurement.
- """
-
- if not 230 <= excitation_wavelength <= 850:
- raise ValueError("Excitation wavelength must be between 230 nm and 850 nm.")
- if not 230 <= emission_wavelength <= 850:
- raise ValueError("Emission wavelength must be between 230 nm and 850 nm.")
- if focal_height < 0:
- raise ValueError("Focal height must be non-negative for fluorescence scans.")
-
- ordered_wells = wells if wells else plate.get_all_items()
- scan_wells = self._scan_visit_order(ordered_wells, serpentine=True)
-
- await self._begin_run()
- try:
- await self._configure_fluorescence(
- excitation_wavelength,
- emission_wavelength,
- focal_height,
- flashes=flashes,
- integration_us=integration_us,
- gain=gain,
- excitation_bandwidth=excitation_bandwidth,
- emission_bandwidth=emission_bandwidth,
- lag_us=lag_us,
- )
- decoder = _FluorescenceRunDecoder(len(scan_wells))
-
- await self._run_scan(
- ordered_wells=ordered_wells,
- decoder=decoder,
- mode="Fluorescence",
- step_loss_commands=[
- "CHECK MTP.STEPLOSS",
- "CHECK FI.TOP.STEPLOSS",
- "CHECK FI.STEPLOSS.Z",
- ],
- serpentine=True,
- scan_direction="UP",
- )
-
- if len(decoder.intensities) != len(scan_wells):
- raise RuntimeError("Fluorescence decoder did not complete scan.")
- intensities = decoder.intensities
- matrix = self._format_plate_result(plate, scan_wells, intensities)
- return [
- {
- "ex_wavelength": excitation_wavelength,
- "em_wavelength": emission_wavelength,
- "time": time.time(),
- "temperature": None,
- "data": matrix,
- }
- ]
- finally:
- await self._end_run()
-
- async def _configure_fluorescence(
- self,
- excitation_nm: int,
- emission_nm: int,
- focal_height: float,
- *,
- flashes: int,
- integration_us: int,
- gain: int,
- excitation_bandwidth: int,
- emission_bandwidth: int,
- lag_us: int,
- ) -> None:
- ex_decitenth = int(round(excitation_nm * 10))
- em_decitenth = int(round(emission_nm * 10))
- reads_number = max(1, flashes)
- beam_diameter = self._capability_numeric("FI.TOP", "#BEAM DIAMETER", 3000)
- z_position = int(round(focal_height * self.counts_per_mm_z))
-
- # UI issues the entire FI configuration twice before PREPARE REF.
- for _ in range(2):
- await self._send_command("MODE FI.TOP", allow_timeout=True)
- await self._clear_mode_settings(excitation=True, emission=True)
- await self._send_command(
- f"EXCITATION 0,FI,{ex_decitenth},{excitation_bandwidth},0", allow_timeout=True
- )
- await self._send_command(
- f"EMISSION 0,FI,{em_decitenth},{emission_bandwidth},0", allow_timeout=True
- )
- await self._send_command(f"TIME 0,INTEGRATION={integration_us}", allow_timeout=True)
- await self._send_command(f"TIME 0,LAG={lag_us}", allow_timeout=True)
- await self._send_command("TIME 0,READDELAY=0", allow_timeout=True)
- await self._send_command(f"GAIN 0,VALUE={gain}", allow_timeout=True)
- await self._send_command(f"POSITION 0,Z={z_position}", allow_timeout=True)
- await self._send_command(f"BEAM DIAMETER={beam_diameter}", allow_timeout=True)
- await self._send_command("SCAN DIRECTION=UP", allow_timeout=True)
- await self._send_command("RATIO LABELS=1", allow_timeout=True)
- await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command(
- f"EXCITATION 1,FI,{ex_decitenth},{excitation_bandwidth},0", allow_timeout=True
- )
- await self._send_command(
- f"EMISSION 1,FI,{em_decitenth},{emission_bandwidth},0", allow_timeout=True
- )
- await self._send_command(f"TIME 1,INTEGRATION={integration_us}", allow_timeout=True)
- await self._send_command(f"TIME 1,LAG={lag_us}", allow_timeout=True)
- await self._send_command("TIME 1,READDELAY=0", allow_timeout=True)
- await self._send_command(f"GAIN 1,VALUE={gain}", allow_timeout=True)
- await self._send_command(f"POSITION 1,Z={z_position}", allow_timeout=True)
- await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command("PREPARE REF", allow_timeout=True, read_response=False)
+ params = TecanInfiniteFluorescenceParams(
+ flashes=flashes,
+ integration_us=integration_us,
+ gain=gain,
+ excitation_bandwidth=excitation_bandwidth,
+ emission_bandwidth=emission_bandwidth,
+ lag_us=lag_us,
+ )
+ results = await self._fluorescence.read_fluorescence(
+ plate=plate,
+ wells=wells,
+ excitation_wavelength=excitation_wavelength,
+ emission_wavelength=emission_wavelength,
+ focal_height=focal_height,
+ backend_params=params,
+ )
+ return [
+ {
+ "ex_wavelength": r.excitation_wavelength,
+ "em_wavelength": r.emission_wavelength,
+ "time": r.timestamp,
+ "temperature": r.temperature,
+ "data": r.data,
+ }
+ for r in results
+ ]
async def read_luminescence(
self,
@@ -850,493 +245,56 @@ async def read_luminescence(
dark_integration_us: int = 3_000_000,
meas_integration_us: int = 1_000_000,
) -> List[Dict]:
- """Queue and execute a luminescence scan."""
-
- if focal_height < 0:
- raise ValueError("Focal height must be non-negative for luminescence scans.")
-
- ordered_wells = wells if wells else plate.get_all_items()
- scan_wells = self._scan_visit_order(ordered_wells, serpentine=False)
-
- dark_integration = dark_integration_us
- meas_integration = meas_integration_us
-
- await self._begin_run()
- try:
- await self._configure_luminescence(
- dark_integration, meas_integration, focal_height, flashes=flashes
- )
-
- decoder = _LuminescenceRunDecoder(
- len(scan_wells),
- dark_integration_s=_integration_microseconds_to_seconds(dark_integration),
- meas_integration_s=_integration_microseconds_to_seconds(meas_integration),
- )
-
- await self._run_scan(
- ordered_wells=ordered_wells,
- decoder=decoder,
- mode="Luminescence",
- step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK LUM.STEPLOSS"],
- serpentine=False,
- scan_direction="UP",
- )
-
- if len(decoder.measurements) != len(scan_wells):
- raise RuntimeError("Luminescence decoder did not complete scan.")
- intensities = [measurement.intensity for measurement in decoder.measurements]
- matrix = self._format_plate_result(plate, scan_wells, intensities)
- return [
- {
- "time": time.time(),
- "temperature": None,
- "data": matrix,
- }
- ]
- finally:
- await self._end_run()
-
- async def _await_measurements(
- self, decoder: "_MeasurementDecoder", row_count: int, mode: str
- ) -> None:
- target = decoder.count + row_count
- start_count = decoder.count
- self._drain_pending_bin_events(decoder)
- start = time.monotonic()
- reads = 0
- while decoder.count < target and (time.monotonic() - start) < self._max_row_wait_s:
- chunk = await self._read_packet(self._read_chunk_size)
- if not chunk:
- raise RuntimeError(f"{mode} read returned empty chunk; transport may not support reads.")
- decoder.feed(chunk)
- reads += 1
- if decoder.count < target:
- got = decoder.count - start_count
- raise RuntimeError(
- f"Timed out while parsing {mode.lower()} results "
- f"(decoded {got}/{row_count} measurements in {time.monotonic() - start:.1f}s, {reads} reads)."
- )
-
- def _drain_pending_bin_events(self, decoder: "_MeasurementDecoder") -> None:
- if not self._pending_bin_events:
- return
- for payload_len, blob in self._pending_bin_events:
- decoder.feed_bin(payload_len, blob)
- self._pending_bin_events.clear()
-
- async def _await_scan_terminal(self, saw_terminal: bool) -> None:
- if saw_terminal:
- return
- await self._read_command_response()
-
- async def _configure_luminescence(
- self,
- dark_integration: int,
- meas_integration: int,
- focal_height: float,
- *,
- flashes: int,
- ) -> None:
- await self._send_command("MODE LUM")
- # Pre-flight safety checks observed in captures (queries omitted).
- await self._send_command("CHECK LUM.FIBER")
- await self._send_command("CHECK LUM.LID")
- await self._send_command("CHECK LUM.STEPLOSS")
- await self._send_command("MODE LUM")
- reads_number = max(1, flashes)
- z_position = int(round(focal_height * self.counts_per_mm_z))
- await self._clear_mode_settings(emission=True)
- await self._send_command(f"POSITION LUM,Z={z_position}", allow_timeout=True)
- await self._send_command(f"TIME 0,INTEGRATION={dark_integration}", allow_timeout=True)
- await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command("SCAN DIRECTION=UP", allow_timeout=True)
- await self._send_command("RATIO LABELS=1", allow_timeout=True)
- await self._send_command("EMISSION 1,EMPTY,0,0,0", allow_timeout=True)
- await self._send_command(f"TIME 1,INTEGRATION={meas_integration}", allow_timeout=True)
- await self._send_command("TIME 1,READDELAY=0", allow_timeout=True)
- await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
- await self._send_command("#EMISSION ATTENUATION", allow_timeout=True)
- await self._send_command("PREPARE REF", allow_timeout=True, read_response=False)
-
- def _group_by_row(self, wells: Sequence[Well]) -> List[Tuple[int, List[Well]]]:
- grouped: Dict[int, List[Well]] = {}
- for well in wells:
- grouped.setdefault(well.get_row(), []).append(well)
- for row in grouped.values():
- row.sort(key=lambda w: w.get_column())
- return sorted(grouped.items(), key=lambda item: item[0])
-
- def _scan_visit_order(self, wells: Sequence[Well], serpentine: bool) -> List[Well]:
- visit: List[Well] = []
- for row_index, row_wells in self._group_by_row(wells):
- if serpentine and row_index % 2 == 1:
- visit.extend(reversed(row_wells))
- else:
- visit.extend(row_wells)
- return visit
-
- def _map_well_to_stage(self, well: Well) -> StagePosition:
- if well.location is None:
- raise ValueError("Well does not have a location assigned within its plate definition.")
- center = well.location + well.get_anchor(x="c", y="c")
- stage_x = int(round(center.x * self.counts_per_mm_x))
- parent_plate = well.parent
- if parent_plate is None or not isinstance(parent_plate, Plate):
- raise ValueError("Well is not assigned to a plate; cannot derive stage coordinates.")
- plate_height_mm = parent_plate.get_size_y()
- stage_y = int(round((plate_height_mm - center.y) * self.counts_per_mm_y))
- return stage_x, stage_y
-
- def _scan_range(
- self, row_index: int, row_wells: Sequence[Well], serpentine: bool
- ) -> Tuple[int, int, int]:
- """Return start/end/count for a row, honoring serpentine layout when requested."""
-
- first_x, _ = self._map_well_to_stage(row_wells[0])
- last_x, _ = self._map_well_to_stage(row_wells[-1])
- count = len(row_wells)
- if not serpentine:
- return min(first_x, last_x), max(first_x, last_x), count
- if row_index % 2 == 0:
- return first_x, last_x, count
- return last_x, first_x, count
-
- def _format_plate_result(
- self, plate: Plate, wells: Sequence[Well], values: Sequence[float]
- ) -> List[List[Optional[float]]]:
- matrix: List[List[Optional[float]]] = [
- [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y)
+ params = TecanInfiniteLuminescenceParams(
+ flashes=flashes,
+ dark_integration_us=dark_integration_us,
+ meas_integration_us=meas_integration_us,
+ )
+ results = await self._luminescence.read_luminescence(
+ plate=plate,
+ wells=wells,
+ focal_height=focal_height,
+ backend_params=params,
+ )
+ return [
+ {
+ "time": r.timestamp,
+ "temperature": r.temperature,
+ "data": r.data,
+ }
+ for r in results
]
- for well, val in zip(wells, values):
- r, c = well.get_row(), well.get_column()
- if 0 <= r < plate.num_items_y and 0 <= c < plate.num_items_x:
- matrix[r][c] = float(val)
- return matrix
-
- async def _initialize_device(self) -> None:
- try:
- await self._send_command("QQ")
- except TimeoutError:
- logger.warning("QQ produced no response; continuing with initialization.")
- await self._send_command("INIT FORCE")
-
- async def _begin_run(self) -> None:
- self._reset_stream_state()
- await self._send_command("KEYLOCK ON")
- self._run_active = True
-
- def _reset_stream_state(self) -> None:
- self._pending_bin_events.clear()
- self._parser = _StreamParser(allow_bare_ascii=True)
-
- async def _read_packet(self, size: int) -> bytes:
- try:
- data = await self.io.read(size=size)
- except TimeoutError:
- await self._recover_transport()
- raise
- return data
-
- async def _recover_transport(self) -> None:
- try:
- await self.io.stop()
- await asyncio.sleep(0.2)
- await self.io.setup()
- except Exception:
- logger.warning("Transport recovery failed.", exc_info=True)
- return
- self._mode_capabilities.clear()
- self._reset_stream_state()
- await self._initialize_device()
-
- async def _end_run(self) -> None:
- try:
- await self._send_command("TERMINATE", allow_timeout=True)
- for cmd in self._active_step_loss_commands:
- await self._send_command(cmd, allow_timeout=True)
- await self._send_command("KEYLOCK OFF", allow_timeout=True)
- await self._send_command("ABSOLUTE MTP,IN", allow_timeout=True)
- finally:
- self._run_active = False
- self._active_step_loss_commands = []
-
- async def _cleanup_protocol(self) -> None:
- async def send_cleanup_cmd(cmd: str) -> None:
- try:
- await self._send_command(cmd, allow_timeout=True, read_response=False)
- except Exception:
- logger.warning("Cleanup command failed: %s", cmd)
- if self._run_active or self._active_step_loss_commands:
- await send_cleanup_cmd("TERMINATE")
- for cmd in self._active_step_loss_commands:
- await send_cleanup_cmd(cmd)
- await send_cleanup_cmd("KEYLOCK OFF")
- await send_cleanup_cmd("ABSOLUTE MTP,IN")
- self._run_active = False
- self._active_step_loss_commands = []
-
- async def _query_mode_capabilities(self, mode: str) -> None:
- commands = self._MODE_CAPABILITY_COMMANDS.get(mode)
- if not commands:
- return
- try:
- await self._send_command(f"MODE {mode}")
- except TimeoutError:
- logger.warning("Capability MODE %s timed out; continuing without mode capabilities.", mode)
- return
- collected: Dict[str, str] = {}
- for cmd in commands:
- try:
- frames = await self._send_command(cmd)
- except TimeoutError:
- logger.warning("Capability query '%s' timed out; proceeding with defaults.", cmd)
- continue
- if frames:
- collected[cmd] = frames[-1]
- if collected:
- self._mode_capabilities[mode] = collected
-
- def _get_mode_capability(self, mode: str, command: str) -> Optional[str]:
- return self._mode_capabilities.get(mode, {}).get(command)
-
- def _capability_numeric(self, mode: str, command: str, fallback: int) -> int:
- resp = self._get_mode_capability(mode, command)
- if not resp:
- return fallback
- token = resp.split("|")[0].split(":")[0].split("~")[0].strip()
- if not token:
- return fallback
- try:
- return int(float(token))
- except ValueError:
- return fallback
+ # -- method delegates for test compat --
@staticmethod
def _frame_command(command: str) -> bytes:
- """Return a framed command with length/checksum trailer."""
-
- payload = command.encode("ascii")
- xor = 0
- for byte in payload:
- xor ^= byte
- checksum = (xor ^ 0x01) & 0xFF
- length = len(payload) & 0xFF
- return b"\x02" + payload + b"\x03\x00\x00" + bytes([length, checksum]) + b"\x0d"
-
- async def _send_command(
- self,
- command: str,
- wait_for_terminal: bool = True,
- allow_timeout: bool = False,
- read_response: bool = True,
- ) -> List[str]:
- logger.debug("[tecan] >> %s", command)
- framed = self._frame_command(command)
- await self.io.write(framed)
- if not read_response:
- return []
- if command.startswith(("#", "?")):
- try:
- return await self._read_command_response(require_terminal=False)
- except TimeoutError:
- if allow_timeout:
- logger.warning("Timeout waiting for response to %s", command)
- return []
- raise
- try:
- frames = await self._read_command_response(require_terminal=wait_for_terminal)
- except TimeoutError:
- if allow_timeout:
- logger.warning("Timeout waiting for response to %s", command)
- return []
- raise
- for pkt in frames:
- logger.debug("[tecan] << %s", pkt)
- return frames
-
- async def _drain(self, attempts: int = 4) -> None:
- """Read and discard a few packets to clear the stream."""
- for _ in range(attempts):
- data = await self._read_packet(128)
- if not data:
- break
-
- async def _read_command_response(
- self, max_iterations: int = 8, require_terminal: bool = True
- ) -> List[str]:
- """Read response frames and cache any binary payloads that arrive."""
- frames: List[str] = []
- saw_terminal = False
- for _ in range(max_iterations):
- chunk = await self._read_packet(128)
- if not chunk:
- break
- for event in self._parser.feed(chunk):
- if event.text is not None:
- frames.append(event.text)
- if self._is_terminal_frame(event.text):
- saw_terminal = True
- elif event.payload_len is not None and event.blob is not None:
- self._pending_bin_events.append((event.payload_len, event.blob))
- if not require_terminal and frames and not self._parser.has_pending_bin():
- break
- if require_terminal and saw_terminal and not self._parser.has_pending_bin():
- break
- if require_terminal and not saw_terminal:
- # best effort: drain once more so pending ST doesn't leak into next command
- await self._drain(1)
- return frames
+ return frame_command(command)
@staticmethod
def _is_terminal_frame(text: str) -> bool:
- """Return True if the ASCII frame is a terminal marker."""
- return text in {"ST", "+", "-"} or text.startswith("BY#T")
-
+ return is_terminal_frame(text)
-@dataclass
-class _AbsorbanceMeasurement:
- sample: int
- reference: int
- items: Optional[List[Tuple[int, int]]] = None
+ def _scan_visit_order(self, wells, serpentine=True):
+ return self._driver.scan_visit_order(wells, serpentine)
+ def _group_by_row(self, wells):
+ return self._driver.group_by_row(wells)
-class _AbsorbanceRunDecoder(_MeasurementDecoder):
- """Incrementally decode absorbance measurement frames."""
+ def _scan_range(self, row_index, row_wells, serpentine=True):
+ return self._driver.scan_range(row_index, row_wells, serpentine)
- STATUS_FRAME_LEN = 31
-
- def __init__(self, expected: int) -> None:
- super().__init__(expected)
- self.measurements: List[_AbsorbanceMeasurement] = []
- self._calibration: Optional[_AbsorbanceCalibration] = None
-
- @property
- def count(self) -> int:
- return len(self.measurements)
+ def _map_well_to_stage(self, well):
+ return self._driver.map_well_to_stage(well)
- @property
- def calibration(self) -> Optional[_AbsorbanceCalibration]:
- """Return the absorbance calibration data, if available."""
- return self._calibration
-
- def _should_consume_bin(self, payload_len: int) -> bool:
- return _is_abs_calibration_len(payload_len) or _is_abs_data_len(payload_len)
-
- def _handle_bin(self, payload_len: int, blob: bytes) -> None:
- if _is_abs_calibration_len(payload_len):
- if self._calibration is not None:
- return
- cal = _decode_abs_calibration(payload_len, blob)
- if cal is not None:
- self._calibration = cal
- return
- if _is_abs_data_len(payload_len):
- data = _decode_abs_data(payload_len, blob)
- if data is None:
- return
- _label, _ex, items = data
- sample, reference = items[0] if items else (0, 0)
- self.measurements.append(
- _AbsorbanceMeasurement(sample=sample, reference=reference, items=items)
- )
-
-
-class _FluorescenceRunDecoder(_MeasurementDecoder):
- """Incrementally decode fluorescence measurement frames."""
-
- STATUS_FRAME_LEN = 31
-
- def __init__(self, expected_wells: int) -> None:
- super().__init__(expected_wells)
- self._intensities: List[int] = []
- self._calibration: Optional[_FluorescenceCalibration] = None
-
- @property
- def count(self) -> int:
- return len(self._intensities)
-
- @property
- def intensities(self) -> List[int]:
- """Return decoded fluorescence intensities."""
- return self._intensities
-
- def _should_consume_bin(self, payload_len: int) -> bool:
- if payload_len == 18:
- return True
- if payload_len >= 16 and (payload_len - 6) % 10 == 0:
- return True
- return False
-
- def _handle_bin(self, payload_len: int, blob: bytes) -> None:
- if payload_len == 18:
- cal = _decode_flr_calibration(payload_len, blob)
- if cal is not None:
- self._calibration = cal
- return
- data = _decode_flr_data(payload_len, blob)
- if data is None:
- return
- _label, _ex, _em, items = data
- if self._calibration is not None:
- intensity = _fluorescence_corrected(self._calibration, items)
- else:
- if not items:
- intensity = 0
- else:
- intensity = int(round(sum(m for m, _ in items) / len(items)))
- self._intensities.append(intensity)
-
-
-@dataclass
-class _LuminescenceMeasurement:
- intensity: int
-
-
-class _LuminescenceRunDecoder(_MeasurementDecoder):
- """Incrementally decode luminescence measurement frames."""
-
- def __init__(
- self,
- expected: int,
- *,
- dark_integration_s: float = 0.0,
- meas_integration_s: float = 0.0,
- ) -> None:
- super().__init__(expected)
- self.measurements: List[_LuminescenceMeasurement] = []
- self._calibration: Optional[_LuminescenceCalibration] = None
- self._dark_integration_s = float(dark_integration_s)
- self._meas_integration_s = float(meas_integration_s)
-
- @property
- def count(self) -> int:
- return len(self.measurements)
+ def _format_plate_result(self, plate, scan_wells, values):
+ return format_plate_result(plate, scan_wells, values)
- def _should_consume_bin(self, payload_len: int) -> bool:
- if payload_len == 10:
- return True
- if payload_len >= 14 and (payload_len - 4) % 10 == 0:
- return True
- return False
+ def _capability_numeric(self, mode, command, fallback):
+ return self._driver.capability_numeric(mode, command, fallback)
- def _handle_bin(self, payload_len: int, blob: bytes) -> None:
- if payload_len == 10:
- cal = _decode_lum_calibration(payload_len, blob)
- if cal is not None:
- self._calibration = cal
- return
- data = _decode_lum_data(payload_len, blob)
- if data is None:
- return
- _label, _em, counts = data
- if self._calibration is not None and self._dark_integration_s and self._meas_integration_s:
- intensity = _luminescence_intensity(
- self._calibration, counts, self._dark_integration_s, self._meas_integration_s
- )
- else:
- intensity = int(round(sum(counts) / len(counts))) if counts else 0
- self.measurements.append(_LuminescenceMeasurement(intensity=intensity))
+ async def _send_command(self, command, **kwargs):
+ return await self._driver.send_command(command, **kwargs)
__all__ = [
diff --git a/pylabrobot/legacy/plate_reading/tecan/infinite_backend_tests.py b/pylabrobot/legacy/plate_reading/tecan/infinite_backend_tests.py
index 284254d75b0..1501e4a4a82 100644
--- a/pylabrobot/legacy/plate_reading/tecan/infinite_backend_tests.py
+++ b/pylabrobot/legacy/plate_reading/tecan/infinite_backend_tests.py
@@ -606,7 +606,7 @@ def test_scan_visit_order_linear(self):
self.assertEqual(identifiers, ["A1", "A2", "A3", "B1", "B2", "B3"])
def test_scan_range_serpentine(self):
- setattr(self.backend, "_map_well_to_stage", lambda well: (well.get_column(), well.get_row()))
+ setattr(self.backend._driver, "map_well_to_stage", lambda well: (well.get_column(), well.get_row()))
row_index, row_wells = self.backend._group_by_row(self.plate.get_all_items())[0]
start_x, end_x, count = self.backend._scan_range(row_index, row_wells, serpentine=True)
self.assertEqual((start_x, end_x, count), (0, 2, 3))
@@ -704,8 +704,8 @@ async def mock_await(decoder, row_count, mode):
data_len, data_blob = _abs_data_blob(6000, 500, 1000)
decoder.feed_bin(data_len, data_blob)
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", new_callable=AsyncMock):
await self.backend.read_absorbance(self.plate, [], wavelength=600)
self.mock_usb.write.assert_has_calls(
@@ -765,8 +765,8 @@ async def mock_terminal(_saw_terminal):
cal_len, cal_blob = _abs_calibration_blob(6000, 0, 1000, 0, 1000)
self.backend._pending_bin_events.append((cal_len, cal_blob))
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", side_effect=mock_terminal):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", side_effect=mock_terminal):
result = await self.backend.read_absorbance(self.plate, [], wavelength=600)
self.assertAlmostEqual(result[0]["data"][0][0], 0.3010299956639812)
@@ -783,8 +783,8 @@ async def mock_await(decoder, row_count, mode):
data_len, data_blob = _abs_data_blob(6000, 500, 1000)
decoder.feed_bin(data_len, data_blob)
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", new_callable=AsyncMock):
result = await self.backend.read_absorbance(self.plate, wells, wavelength=600)
self.mock_usb.write.assert_has_calls(
@@ -817,8 +817,8 @@ async def mock_await(decoder, row_count, mode):
data_len, data_blob = _flr_data_blob(4850, 5200, 500, 1000)
decoder.feed_bin(data_len, data_blob)
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", new_callable=AsyncMock):
await self.backend.read_fluorescence(
self.plate, [], excitation_wavelength=485, emission_wavelength=520
)
@@ -892,8 +892,8 @@ async def mock_await(decoder, row_count, mode):
data_len, data_blob = _lum_data_blob(0, 1000)
decoder.feed_bin(data_len, data_blob)
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", new_callable=AsyncMock):
await self.backend.read_luminescence(self.plate, [], focal_height=14.62)
self.mock_usb.write.assert_has_calls(
@@ -952,8 +952,8 @@ async def mock_await(decoder, row_count, mode):
data_len, data_blob = _lum_data_blob(0, 1000)
decoder.feed_bin(data_len, data_blob)
- with patch.object(self.backend, "_await_measurements", side_effect=mock_await):
- with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock):
+ with patch.object(self.backend._driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(self.backend._driver, "_await_scan_terminal", new_callable=AsyncMock):
await self.backend.read_luminescence(self.plate, [])
self.mock_usb.write.assert_any_call(self._frame("POSITION LUM,Z=20000"))
diff --git a/pylabrobot/tecan/__init__.py b/pylabrobot/tecan/__init__.py
new file mode 100644
index 00000000000..f4cd5db5c8a
--- /dev/null
+++ b/pylabrobot/tecan/__init__.py
@@ -0,0 +1,10 @@
+from .infinite import (
+ TecanInfinite200Pro,
+ TecanInfiniteAbsorbanceBackend,
+ TecanInfiniteAbsorbanceParams,
+ TecanInfiniteDriver,
+ TecanInfiniteFluorescenceBackend,
+ TecanInfiniteFluorescenceParams,
+ TecanInfiniteLuminescenceBackend,
+ TecanInfiniteLuminescenceParams,
+)
diff --git a/pylabrobot/tecan/infinite/__init__.py b/pylabrobot/tecan/infinite/__init__.py
new file mode 100644
index 00000000000..77a389d0b80
--- /dev/null
+++ b/pylabrobot/tecan/infinite/__init__.py
@@ -0,0 +1,5 @@
+from .absorbance_backend import TecanInfiniteAbsorbanceBackend, TecanInfiniteAbsorbanceParams
+from .driver import TecanInfiniteDriver
+from .fluorescence_backend import TecanInfiniteFluorescenceBackend, TecanInfiniteFluorescenceParams
+from .infinite import TecanInfinite200Pro
+from .luminescence_backend import TecanInfiniteLuminescenceBackend, TecanInfiniteLuminescenceParams
diff --git a/pylabrobot/tecan/infinite/absorbance_backend.py b/pylabrobot/tecan/infinite/absorbance_backend.py
new file mode 100644
index 00000000000..982d2f99d15
--- /dev/null
+++ b/pylabrobot/tecan/infinite/absorbance_backend.py
@@ -0,0 +1,132 @@
+"""Tecan Infinite 200 PRO absorbance backend."""
+
+from __future__ import annotations
+
+import logging
+import time
+from dataclasses import dataclass
+from typing import List, Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.plate_reading.absorbance.backend import AbsorbanceBackend
+from pylabrobot.capabilities.plate_reading.absorbance.standard import AbsorbanceResult
+from pylabrobot.resources.plate import Plate
+from pylabrobot.resources.well import Well
+from pylabrobot.serializer import SerializableMixin
+
+from .driver import TecanInfiniteDriver
+from .protocol import _AbsorbanceRunDecoder, _absorbance_od_calibrated, format_plate_result
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TecanInfiniteAbsorbanceParams(BackendParams):
+ """Tecan Infinite-specific parameters for absorbance reads.
+
+ Args:
+ flashes: Number of flashes (reads) per well. Default 25.
+ bandwidth: Excitation bandwidth in nm. If None, auto-selected
+ (9 nm for >315 nm, 5 nm otherwise).
+ """
+
+ flashes: int = 25
+ bandwidth: Optional[float] = None
+
+
+class TecanInfiniteAbsorbanceBackend(AbsorbanceBackend):
+ """Translates AbsorbanceBackend interface into Tecan Infinite driver commands."""
+
+ def __init__(self, driver: TecanInfiniteDriver):
+ self.driver = driver
+
+ async def read_absorbance(
+ self,
+ plate: Plate,
+ wells: List[Well],
+ wavelength: int,
+ backend_params: Optional[SerializableMixin] = None,
+ ) -> List[AbsorbanceResult]:
+ if not isinstance(backend_params, TecanInfiniteAbsorbanceParams):
+ backend_params = TecanInfiniteAbsorbanceParams()
+
+ if not 230 <= wavelength <= 1_000:
+ raise ValueError("Absorbance wavelength must be between 230 nm and 1000 nm.")
+
+ ordered_wells = wells if wells else plate.get_all_items()
+ scan_wells = self.driver.scan_visit_order(ordered_wells, serpentine=True)
+ decoder = _AbsorbanceRunDecoder(len(scan_wells))
+
+ await self.driver.begin_run()
+ try:
+ await self._configure_absorbance(
+ wavelength, flashes=backend_params.flashes, bandwidth=backend_params.bandwidth
+ )
+ await self.driver.run_scan(
+ ordered_wells=ordered_wells,
+ decoder=decoder,
+ mode="Absorbance",
+ step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK ABS.STEPLOSS"],
+ serpentine=True,
+ scan_direction="ALTUP",
+ )
+
+ self.driver.drain_pending_bin_events(decoder)
+ if len(decoder.measurements) != len(scan_wells):
+ raise RuntimeError("Absorbance decoder did not complete scan.")
+ intensities: List[float] = []
+ cal = decoder.calibration
+ if cal is None:
+ raise RuntimeError("ABS calibration packet not seen; cannot compute calibrated OD.")
+ for meas in decoder.measurements:
+ items = meas.items or [(meas.sample, meas.reference)]
+ od = _absorbance_od_calibrated(cal, items)
+ intensities.append(od)
+ matrix = format_plate_result(plate, scan_wells, intensities)
+ return [
+ AbsorbanceResult(
+ data=matrix,
+ wavelength=wavelength,
+ temperature=None,
+ timestamp=time.time(),
+ )
+ ]
+ finally:
+ await self.driver.end_run()
+
+ async def _configure_absorbance(
+ self,
+ wavelength_nm: int,
+ *,
+ flashes: int,
+ bandwidth: Optional[float] = None,
+ ) -> None:
+ wl_decitenth = int(round(wavelength_nm * 10))
+ bw = bandwidth if bandwidth is not None else self._auto_bandwidth(wavelength_nm)
+ bw_decitenth = int(round(bw * 10))
+ reads_number = max(1, flashes)
+
+ await self.driver.send_command("MODE ABS")
+ await self.driver.clear_mode_settings(excitation=True)
+ await self.driver.send_command(
+ f"EXCITATION 0,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True
+ )
+ await self.driver.send_command(
+ f"EXCITATION 1,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True
+ )
+ await self.driver.send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command("TIME 0,READDELAY=0", allow_timeout=True)
+ await self.driver.send_command("TIME 1,READDELAY=0", allow_timeout=True)
+ await self.driver.send_command("SCAN DIRECTION=ALTUP", allow_timeout=True)
+ await self.driver.send_command("#RATIO LABELS", allow_timeout=True)
+ await self.driver.send_command(
+ f"BEAM DIAMETER={self.driver.capability_numeric('ABS', '#BEAM DIAMETER', 700)}",
+ allow_timeout=True,
+ )
+ await self.driver.send_command("RATIO LABELS=1", allow_timeout=True)
+ await self.driver.send_command("PREPARE REF", allow_timeout=True, read_response=False)
+
+ @staticmethod
+ def _auto_bandwidth(wavelength_nm: int) -> float:
+ return 9.0 if wavelength_nm > 315 else 5.0
diff --git a/pylabrobot/tecan/infinite/driver.py b/pylabrobot/tecan/infinite/driver.py
new file mode 100644
index 00000000000..1562dacbe79
--- /dev/null
+++ b/pylabrobot/tecan/infinite/driver.py
@@ -0,0 +1,425 @@
+"""Tecan Infinite 200 PRO driver.
+
+Owns the USB connection, connection lifecycle, device-level operations
+(initialize, tray control, keylock), shared scan orchestration, and
+well-to-stage geometry.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import time
+from typing import Dict, List, Optional, Sequence, Tuple
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.device import Driver
+from pylabrobot.io.usb import USB
+from pylabrobot.resources.plate import Plate
+from pylabrobot.resources.well import Well
+
+from .protocol import (
+ _MeasurementDecoder,
+ _StreamParser,
+ StagePosition,
+ frame_command,
+ is_terminal_frame,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class TecanInfiniteDriver(Driver):
+ """USB driver for the Tecan Infinite 200 PRO plate reader.
+
+ Owns the USB connection, low-level command protocol, device-level operations
+ (tray open/close, initialization), shared scan orchestration, and well-to-stage
+ geometry.
+ """
+
+ VENDOR_ID = 0x0C47
+ PRODUCT_ID = 0x8007
+
+ _MODE_CAPABILITY_COMMANDS: Dict[str, List[str]] = {
+ "ABS": ["#BEAM DIAMETER"],
+ "FI.TOP": [],
+ "FI.BOTTOM": [],
+ "LUM": [],
+ }
+
+ def __init__(
+ self,
+ counts_per_mm_x: float = 1_000,
+ counts_per_mm_y: float = 1_000,
+ counts_per_mm_z: float = 1_000,
+ io: Optional[USB] = None,
+ ) -> None:
+ """
+ Args:
+ counts_per_mm_x: Stage counts per mm in X.
+ counts_per_mm_y: Stage counts per mm in Y.
+ counts_per_mm_z: Stage counts per mm in Z.
+ io: Optional USB I/O instance (for test injection).
+ """
+ super().__init__()
+ self.io = io or USB(
+ id_vendor=self.VENDOR_ID,
+ id_product=self.PRODUCT_ID,
+ human_readable_device_name="Tecan Infinite 200 PRO",
+ packet_read_timeout=3,
+ read_timeout=30,
+ )
+ self.counts_per_mm_x = counts_per_mm_x
+ self.counts_per_mm_y = counts_per_mm_y
+ self.counts_per_mm_z = counts_per_mm_z
+ self._setup_lock = asyncio.Lock()
+ self._ready = False
+ self._read_chunk_size = 512
+ self._max_row_wait_s = 300.0
+ self._mode_capabilities: Dict[str, Dict[str, str]] = {}
+ self._pending_bin_events: List[Tuple[int, bytes]] = []
+ self._parser = _StreamParser(allow_bare_ascii=True)
+ self._run_active = False
+ self._active_step_loss_commands: List[str] = []
+
+ # -- lifecycle --
+
+ async def setup(self, backend_params: Optional[BackendParams] = None) -> None:
+ async with self._setup_lock:
+ if self._ready:
+ return
+ await self.io.setup()
+ await self._initialize_device()
+ for mode in self._MODE_CAPABILITY_COMMANDS:
+ if mode not in self._mode_capabilities:
+ await self._query_mode_capabilities(mode)
+ self._ready = True
+
+ async def stop(self) -> None:
+ async with self._setup_lock:
+ if not self._ready:
+ return
+ await self._cleanup_protocol()
+ await self.io.stop()
+ self._mode_capabilities.clear()
+ self._reset_stream_state()
+ self._ready = False
+
+ # -- device-level operations --
+
+ async def open_tray(self) -> None:
+ """Open the reader drawer."""
+ await self.send_command("ABSOLUTE MTP,OUT")
+ await self.send_command("BY#T5000")
+
+ async def close_tray(self) -> None:
+ """Close the reader drawer."""
+ await self.send_command("ABSOLUTE MTP,IN")
+ await self.send_command("BY#T5000")
+
+ # -- generic I/O --
+
+ async def send_command(
+ self,
+ command: str,
+ wait_for_terminal: bool = True,
+ allow_timeout: bool = False,
+ read_response: bool = True,
+ ) -> List[str]:
+ """Send a framed ASCII command and read response frames."""
+ logger.debug("[tecan] >> %s", command)
+ framed = frame_command(command)
+ await self.io.write(framed)
+ if not read_response:
+ return []
+ if command.startswith(("#", "?")):
+ try:
+ return await self._read_command_response(require_terminal=False)
+ except TimeoutError:
+ if allow_timeout:
+ logger.warning("Timeout waiting for response to %s", command)
+ return []
+ raise
+ try:
+ frames = await self._read_command_response(require_terminal=wait_for_terminal)
+ except TimeoutError:
+ if allow_timeout:
+ logger.warning("Timeout waiting for response to %s", command)
+ return []
+ raise
+ for pkt in frames:
+ logger.debug("[tecan] << %s", pkt)
+ return frames
+
+ async def read_packet(self, size: int) -> bytes:
+ """Read raw bytes from the USB transport."""
+ try:
+ data = await self.io.read(size=size)
+ except TimeoutError:
+ await self._recover_transport()
+ raise
+ return data
+
+ # -- scan orchestration --
+
+ async def begin_run(self) -> None:
+ """Begin a measurement run (KEYLOCK ON, reset stream state)."""
+ self._reset_stream_state()
+ await self.send_command("KEYLOCK ON")
+ self._run_active = True
+
+ async def end_run(self) -> None:
+ """End a measurement run (TERMINATE, step loss checks, KEYLOCK OFF, MTP IN)."""
+ try:
+ await self.send_command("TERMINATE", allow_timeout=True)
+ for cmd in self._active_step_loss_commands:
+ await self.send_command(cmd, allow_timeout=True)
+ await self.send_command("KEYLOCK OFF", allow_timeout=True)
+ await self.send_command("ABSOLUTE MTP,IN", allow_timeout=True)
+ finally:
+ self._run_active = False
+ self._active_step_loss_commands = []
+
+ async def run_scan(
+ self,
+ ordered_wells: Sequence[Well],
+ decoder: _MeasurementDecoder,
+ mode: str,
+ step_loss_commands: List[str],
+ serpentine: bool,
+ scan_direction: str,
+ ) -> None:
+ """Run the common scan loop for all measurement types.
+
+ Args:
+ ordered_wells: The wells to scan in row-major order.
+ decoder: The decoder to use for parsing measurements.
+ mode: The mode name for logging (e.g., "Absorbance").
+ step_loss_commands: Commands to run after the scan to check for step loss.
+ serpentine: Whether to use serpentine scan order.
+ scan_direction: The scan direction command (e.g., "ALTUP", "UP").
+ """
+ self._active_step_loss_commands = step_loss_commands
+
+ for row_index, row_wells in self.group_by_row(ordered_wells):
+ start_x, end_x, count = self.scan_range(row_index, row_wells, serpentine=serpentine)
+ _, y_stage = self.map_well_to_stage(row_wells[0])
+
+ await self.send_command(f"ABSOLUTE MTP,Y={y_stage}")
+ await self.send_command(f"ABSOLUTE MTP,X={start_x},Y={y_stage}")
+ await self.send_command(f"SCAN DIRECTION={scan_direction}")
+ await self.send_command(
+ f"SCANX {start_x},{end_x},{count}", wait_for_terminal=False, read_response=False
+ )
+ logger.info(
+ "Queued %s scan row %s (%s wells): y=%s, x=%s..%s",
+ mode.lower(),
+ row_index,
+ count,
+ y_stage,
+ start_x,
+ end_x,
+ )
+ await self._await_measurements(decoder, count, mode)
+ await self._await_scan_terminal(decoder.pop_terminal())
+
+ # -- mode capability queries --
+
+ async def _query_mode_capabilities(self, mode: str) -> None:
+ commands = self._MODE_CAPABILITY_COMMANDS.get(mode)
+ if not commands:
+ return
+ try:
+ await self.send_command(f"MODE {mode}")
+ except TimeoutError:
+ logger.warning("Capability MODE %s timed out; continuing without mode capabilities.", mode)
+ return
+ collected: Dict[str, str] = {}
+ for cmd in commands:
+ try:
+ frames = await self.send_command(cmd)
+ except TimeoutError:
+ logger.warning("Capability query '%s' timed out; proceeding with defaults.", cmd)
+ continue
+ if frames:
+ collected[cmd] = frames[-1]
+ if collected:
+ self._mode_capabilities[mode] = collected
+
+ def get_mode_capability(self, mode: str, command: str) -> Optional[str]:
+ return self._mode_capabilities.get(mode, {}).get(command)
+
+ def capability_numeric(self, mode: str, command: str, fallback: int) -> int:
+ resp = self.get_mode_capability(mode, command)
+ if not resp:
+ return fallback
+ token = resp.split("|")[0].split(":")[0].split("~")[0].strip()
+ if not token:
+ return fallback
+ try:
+ return int(float(token))
+ except ValueError:
+ return fallback
+
+ # -- mode settings --
+
+ async def clear_mode_settings(self, excitation: bool = False, emission: bool = False) -> None:
+ """Clear mode settings before configuring a new scan."""
+ if excitation:
+ await self.send_command("EXCITATION CLEAR", allow_timeout=True)
+ if emission:
+ await self.send_command("EMISSION CLEAR", allow_timeout=True)
+ await self.send_command("TIME CLEAR", allow_timeout=True)
+ await self.send_command("GAIN CLEAR", allow_timeout=True)
+ await self.send_command("READS CLEAR", allow_timeout=True)
+ await self.send_command("POSITION CLEAR", allow_timeout=True)
+ await self.send_command("MIRROR CLEAR", allow_timeout=True)
+
+ # -- geometry --
+
+ def map_well_to_stage(self, well: Well) -> StagePosition:
+ if well.location is None:
+ raise ValueError("Well does not have a location assigned within its plate definition.")
+ center = well.location + well.get_anchor(x="c", y="c")
+ stage_x = int(round(center.x * self.counts_per_mm_x))
+ parent_plate = well.parent
+ if parent_plate is None or not isinstance(parent_plate, Plate):
+ raise ValueError("Well is not assigned to a plate; cannot derive stage coordinates.")
+ plate_height_mm = parent_plate.get_size_y()
+ stage_y = int(round((plate_height_mm - center.y) * self.counts_per_mm_y))
+ return stage_x, stage_y
+
+ def group_by_row(self, wells: Sequence[Well]) -> List[Tuple[int, List[Well]]]:
+ grouped: Dict[int, List[Well]] = {}
+ for well in wells:
+ grouped.setdefault(well.get_row(), []).append(well)
+ for row in grouped.values():
+ row.sort(key=lambda w: w.get_column())
+ return sorted(grouped.items(), key=lambda item: item[0])
+
+ def scan_visit_order(self, wells: Sequence[Well], serpentine: bool) -> List[Well]:
+ visit: List[Well] = []
+ for row_index, row_wells in self.group_by_row(wells):
+ if serpentine and row_index % 2 == 1:
+ visit.extend(reversed(row_wells))
+ else:
+ visit.extend(row_wells)
+ return visit
+
+ def scan_range(
+ self, row_index: int, row_wells: Sequence[Well], serpentine: bool
+ ) -> Tuple[int, int, int]:
+ first_x, _ = self.map_well_to_stage(row_wells[0])
+ last_x, _ = self.map_well_to_stage(row_wells[-1])
+ count = len(row_wells)
+ if not serpentine:
+ return min(first_x, last_x), max(first_x, last_x), count
+ if row_index % 2 == 0:
+ return first_x, last_x, count
+ return last_x, first_x, count
+
+ # -- internal helpers --
+
+ async def _initialize_device(self) -> None:
+ try:
+ await self.send_command("QQ")
+ except TimeoutError:
+ logger.warning("QQ produced no response; continuing with initialization.")
+ await self.send_command("INIT FORCE")
+
+ async def _cleanup_protocol(self) -> None:
+ async def send_cleanup_cmd(cmd: str) -> None:
+ try:
+ await self.send_command(cmd, allow_timeout=True, read_response=False)
+ except Exception:
+ logger.warning("Cleanup command failed: %s", cmd)
+
+ if self._run_active or self._active_step_loss_commands:
+ await send_cleanup_cmd("TERMINATE")
+ for cmd in self._active_step_loss_commands:
+ await send_cleanup_cmd(cmd)
+ await send_cleanup_cmd("KEYLOCK OFF")
+ await send_cleanup_cmd("ABSOLUTE MTP,IN")
+ self._run_active = False
+ self._active_step_loss_commands = []
+
+ async def _await_measurements(
+ self, decoder: _MeasurementDecoder, row_count: int, mode: str
+ ) -> None:
+ target = decoder.count + row_count
+ start_count = decoder.count
+ self.drain_pending_bin_events(decoder)
+ start = time.monotonic()
+ reads = 0
+ while decoder.count < target and (time.monotonic() - start) < self._max_row_wait_s:
+ chunk = await self.read_packet(self._read_chunk_size)
+ if not chunk:
+ raise RuntimeError(f"{mode} read returned empty chunk; transport may not support reads.")
+ decoder.feed(chunk)
+ reads += 1
+ if decoder.count < target:
+ got = decoder.count - start_count
+ raise RuntimeError(
+ f"Timed out while parsing {mode.lower()} results "
+ f"(decoded {got}/{row_count} measurements in {time.monotonic() - start:.1f}s, {reads} reads)."
+ )
+
+ def drain_pending_bin_events(self, decoder: _MeasurementDecoder) -> None:
+ if not self._pending_bin_events:
+ return
+ for payload_len, blob in self._pending_bin_events:
+ decoder.feed_bin(payload_len, blob)
+ self._pending_bin_events.clear()
+
+ async def _await_scan_terminal(self, saw_terminal: bool) -> None:
+ if saw_terminal:
+ return
+ await self._read_command_response()
+
+ def _reset_stream_state(self) -> None:
+ self._pending_bin_events.clear()
+ self._parser = _StreamParser(allow_bare_ascii=True)
+
+ async def _read_command_response(
+ self, max_iterations: int = 8, require_terminal: bool = True
+ ) -> List[str]:
+ """Read response frames and cache any binary payloads that arrive."""
+ frames: List[str] = []
+ saw_terminal = False
+ for _ in range(max_iterations):
+ chunk = await self.read_packet(128)
+ if not chunk:
+ break
+ for event in self._parser.feed(chunk):
+ if event.text is not None:
+ frames.append(event.text)
+ if is_terminal_frame(event.text):
+ saw_terminal = True
+ elif event.payload_len is not None and event.blob is not None:
+ self._pending_bin_events.append((event.payload_len, event.blob))
+ if not require_terminal and frames and not self._parser.has_pending_bin():
+ break
+ if require_terminal and saw_terminal and not self._parser.has_pending_bin():
+ break
+ if require_terminal and not saw_terminal:
+ await self._drain(1)
+ return frames
+
+ async def _recover_transport(self) -> None:
+ try:
+ await self.io.stop()
+ await asyncio.sleep(0.2)
+ await self.io.setup()
+ except Exception:
+ logger.warning("Transport recovery failed.", exc_info=True)
+ return
+ self._mode_capabilities.clear()
+ self._reset_stream_state()
+ await self._initialize_device()
+
+ async def _drain(self, attempts: int = 4) -> None:
+ """Read and discard a few packets to clear the stream."""
+ for _ in range(attempts):
+ data = await self.read_packet(128)
+ if not data:
+ break
diff --git a/pylabrobot/tecan/infinite/fluorescence_backend.py b/pylabrobot/tecan/infinite/fluorescence_backend.py
new file mode 100644
index 00000000000..b3144886380
--- /dev/null
+++ b/pylabrobot/tecan/infinite/fluorescence_backend.py
@@ -0,0 +1,166 @@
+"""Tecan Infinite 200 PRO fluorescence backend."""
+
+from __future__ import annotations
+
+import logging
+import time
+from dataclasses import dataclass
+from typing import List, Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.plate_reading.fluorescence.backend import FluorescenceBackend
+from pylabrobot.capabilities.plate_reading.fluorescence.standard import FluorescenceResult
+from pylabrobot.resources.plate import Plate
+from pylabrobot.resources.well import Well
+from pylabrobot.serializer import SerializableMixin
+
+from .driver import TecanInfiniteDriver
+from .protocol import _FluorescenceRunDecoder, format_plate_result
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TecanInfiniteFluorescenceParams(BackendParams):
+ """Tecan Infinite-specific parameters for fluorescence reads.
+
+ Args:
+ flashes: Number of flashes (reads) per well. Default 25.
+ integration_us: Integration time in microseconds. Default 20.
+ gain: PMT gain value (0-255). Default 100.
+ excitation_bandwidth: Excitation filter bandwidth in deci-tenths of nm. Default 50.
+ emission_bandwidth: Emission filter bandwidth in deci-tenths of nm. Default 200.
+ lag_us: Lag time in microseconds between excitation and measurement. Default 0.
+ """
+
+ flashes: int = 25
+ integration_us: int = 20
+ gain: int = 100
+ excitation_bandwidth: int = 50
+ emission_bandwidth: int = 200
+ lag_us: int = 0
+
+
+class TecanInfiniteFluorescenceBackend(FluorescenceBackend):
+ """Translates FluorescenceBackend interface into Tecan Infinite driver commands."""
+
+ def __init__(self, driver: TecanInfiniteDriver):
+ self.driver = driver
+
+ async def read_fluorescence(
+ self,
+ plate: Plate,
+ wells: List[Well],
+ excitation_wavelength: int,
+ emission_wavelength: int,
+ focal_height: float,
+ backend_params: Optional[SerializableMixin] = None,
+ ) -> List[FluorescenceResult]:
+ if not isinstance(backend_params, TecanInfiniteFluorescenceParams):
+ backend_params = TecanInfiniteFluorescenceParams()
+
+ if not 230 <= excitation_wavelength <= 850:
+ raise ValueError("Excitation wavelength must be between 230 nm and 850 nm.")
+ if not 230 <= emission_wavelength <= 850:
+ raise ValueError("Emission wavelength must be between 230 nm and 850 nm.")
+ if focal_height < 0:
+ raise ValueError("Focal height must be non-negative for fluorescence scans.")
+
+ ordered_wells = wells if wells else plate.get_all_items()
+ scan_wells = self.driver.scan_visit_order(ordered_wells, serpentine=True)
+
+ await self.driver.begin_run()
+ try:
+ await self._configure_fluorescence(
+ excitation_wavelength,
+ emission_wavelength,
+ focal_height,
+ flashes=backend_params.flashes,
+ integration_us=backend_params.integration_us,
+ gain=backend_params.gain,
+ excitation_bandwidth=backend_params.excitation_bandwidth,
+ emission_bandwidth=backend_params.emission_bandwidth,
+ lag_us=backend_params.lag_us,
+ )
+ decoder = _FluorescenceRunDecoder(len(scan_wells))
+
+ await self.driver.run_scan(
+ ordered_wells=ordered_wells,
+ decoder=decoder,
+ mode="Fluorescence",
+ step_loss_commands=[
+ "CHECK MTP.STEPLOSS",
+ "CHECK FI.TOP.STEPLOSS",
+ "CHECK FI.STEPLOSS.Z",
+ ],
+ serpentine=True,
+ scan_direction="UP",
+ )
+
+ if len(decoder.intensities) != len(scan_wells):
+ raise RuntimeError("Fluorescence decoder did not complete scan.")
+ intensities = decoder.intensities
+ matrix = format_plate_result(plate, scan_wells, intensities)
+ return [
+ FluorescenceResult(
+ data=matrix,
+ excitation_wavelength=excitation_wavelength,
+ emission_wavelength=emission_wavelength,
+ temperature=None,
+ timestamp=time.time(),
+ )
+ ]
+ finally:
+ await self.driver.end_run()
+
+ async def _configure_fluorescence(
+ self,
+ excitation_nm: int,
+ emission_nm: int,
+ focal_height: float,
+ *,
+ flashes: int,
+ integration_us: int,
+ gain: int,
+ excitation_bandwidth: int,
+ emission_bandwidth: int,
+ lag_us: int,
+ ) -> None:
+ ex_decitenth = int(round(excitation_nm * 10))
+ em_decitenth = int(round(emission_nm * 10))
+ reads_number = max(1, flashes)
+ beam_diameter = self.driver.capability_numeric("FI.TOP", "#BEAM DIAMETER", 3000)
+ z_position = int(round(focal_height * self.driver.counts_per_mm_z))
+
+ # UI issues the entire FI configuration twice before PREPARE REF.
+ for _ in range(2):
+ await self.driver.send_command("MODE FI.TOP", allow_timeout=True)
+ await self.driver.clear_mode_settings(excitation=True, emission=True)
+ await self.driver.send_command(
+ f"EXCITATION 0,FI,{ex_decitenth},{excitation_bandwidth},0", allow_timeout=True
+ )
+ await self.driver.send_command(
+ f"EMISSION 0,FI,{em_decitenth},{emission_bandwidth},0", allow_timeout=True
+ )
+ await self.driver.send_command(f"TIME 0,INTEGRATION={integration_us}", allow_timeout=True)
+ await self.driver.send_command(f"TIME 0,LAG={lag_us}", allow_timeout=True)
+ await self.driver.send_command("TIME 0,READDELAY=0", allow_timeout=True)
+ await self.driver.send_command(f"GAIN 0,VALUE={gain}", allow_timeout=True)
+ await self.driver.send_command(f"POSITION 0,Z={z_position}", allow_timeout=True)
+ await self.driver.send_command(f"BEAM DIAMETER={beam_diameter}", allow_timeout=True)
+ await self.driver.send_command("SCAN DIRECTION=UP", allow_timeout=True)
+ await self.driver.send_command("RATIO LABELS=1", allow_timeout=True)
+ await self.driver.send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command(
+ f"EXCITATION 1,FI,{ex_decitenth},{excitation_bandwidth},0", allow_timeout=True
+ )
+ await self.driver.send_command(
+ f"EMISSION 1,FI,{em_decitenth},{emission_bandwidth},0", allow_timeout=True
+ )
+ await self.driver.send_command(f"TIME 1,INTEGRATION={integration_us}", allow_timeout=True)
+ await self.driver.send_command(f"TIME 1,LAG={lag_us}", allow_timeout=True)
+ await self.driver.send_command("TIME 1,READDELAY=0", allow_timeout=True)
+ await self.driver.send_command(f"GAIN 1,VALUE={gain}", allow_timeout=True)
+ await self.driver.send_command(f"POSITION 1,Z={z_position}", allow_timeout=True)
+ await self.driver.send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command("PREPARE REF", allow_timeout=True, read_response=False)
diff --git a/pylabrobot/tecan/infinite/infinite.py b/pylabrobot/tecan/infinite/infinite.py
new file mode 100644
index 00000000000..04d2837de31
--- /dev/null
+++ b/pylabrobot/tecan/infinite/infinite.py
@@ -0,0 +1,78 @@
+"""Tecan Infinite 200 PRO plate reader device."""
+
+from __future__ import annotations
+
+from pylabrobot.capabilities.loading_tray import HasLoadingTray, LoadingTray
+from pylabrobot.capabilities.plate_reading.absorbance import Absorbance
+from pylabrobot.capabilities.plate_reading.fluorescence import Fluorescence
+from pylabrobot.capabilities.plate_reading.luminescence import Luminescence
+from pylabrobot.device import Device
+from pylabrobot.resources import Coordinate, Resource
+
+from .absorbance_backend import TecanInfiniteAbsorbanceBackend
+from .driver import TecanInfiniteDriver
+from .fluorescence_backend import TecanInfiniteFluorescenceBackend
+from .loading_tray_backend import TecanInfiniteLoadingTrayBackend
+from .luminescence_backend import TecanInfiniteLuminescenceBackend
+
+
+class TecanInfinite200Pro(Resource, Device, HasLoadingTray):
+ """Tecan Infinite 200 PRO plate reader.
+
+ Supports absorbance, fluorescence, and luminescence measurements.
+
+ Examples:
+ >>> reader = TecanInfinite200Pro(name="infinite")
+ >>> await reader.setup()
+ >>> results = await reader.absorbance.read(plate=my_plate, wavelength=600)
+ >>> await reader.stop()
+ """
+
+ def __init__(
+ self,
+ name: str,
+ counts_per_mm_x: float = 1_000,
+ counts_per_mm_y: float = 1_000,
+ counts_per_mm_z: float = 1_000,
+ size_x: float = 0.0,
+ size_y: float = 0.0,
+ size_z: float = 0.0,
+ ):
+ driver = TecanInfiniteDriver(
+ counts_per_mm_x=counts_per_mm_x,
+ counts_per_mm_y=counts_per_mm_y,
+ counts_per_mm_z=counts_per_mm_z,
+ )
+ Resource.__init__(
+ self,
+ name=name,
+ size_x=size_x,
+ size_y=size_y,
+ size_z=size_z,
+ model="Tecan Infinite 200 PRO",
+ category="plate_reader",
+ )
+ Device.__init__(self, driver=driver)
+ self.driver: TecanInfiniteDriver = driver
+
+ self.absorbance = Absorbance(backend=TecanInfiniteAbsorbanceBackend(driver))
+ self.fluorescence = Fluorescence(backend=TecanInfiniteFluorescenceBackend(driver))
+ self.luminescence = Luminescence(backend=TecanInfiniteLuminescenceBackend(driver))
+ self.loading_tray = LoadingTray(
+ backend=TecanInfiniteLoadingTrayBackend(driver),
+ name=name + "_loading_tray",
+ size_x=127.76,
+ size_y=85.48,
+ size_z=0,
+ child_location=Coordinate.zero(),
+ )
+ self._capabilities = [
+ self.absorbance,
+ self.fluorescence,
+ self.luminescence,
+ self.loading_tray,
+ ]
+ self.assign_child_resource(self.loading_tray, location=Coordinate.zero())
+
+ def serialize(self) -> dict:
+ return {**Resource.serialize(self), **Device.serialize(self)}
diff --git a/pylabrobot/tecan/infinite/infinite_tests.py b/pylabrobot/tecan/infinite/infinite_tests.py
new file mode 100644
index 00000000000..39003c9eb14
--- /dev/null
+++ b/pylabrobot/tecan/infinite/infinite_tests.py
@@ -0,0 +1,281 @@
+"""Tests for the new Tecan Infinite 200 PRO architecture."""
+
+import unittest
+from unittest.mock import AsyncMock, patch
+
+from pylabrobot.io.usb import USB
+from pylabrobot.resources import Coordinate, Plate, Well, create_ordered_items_2d
+from pylabrobot.tecan.infinite.driver import TecanInfiniteDriver
+from pylabrobot.tecan.infinite.protocol import (
+ _AbsorbanceRunDecoder,
+ _FluorescenceRunDecoder,
+ _LuminescenceRunDecoder,
+ _absorbance_od_calibrated,
+ _consume_leading_ascii_frame,
+ frame_command,
+ is_terminal_frame,
+)
+
+
+def _pack_u16(words):
+ return b"".join(int(word).to_bytes(2, "big") for word in words)
+
+
+def _bin_blob(payload):
+ payload_len = len(payload)
+ trailer = b"\x00\x00\x00\x00"
+ return payload_len, payload + trailer
+
+
+def _abs_calibration_blob(ex_decitenth, meas_dark, meas_bright, ref_dark, ref_bright):
+ header = _pack_u16([0, ex_decitenth])
+ item = (0).to_bytes(4, "big") + _pack_u16([0, 0, meas_dark, meas_bright, 0, ref_dark, ref_bright])
+ return _bin_blob(header + item)
+
+
+def _abs_data_blob(ex_decitenth, meas, ref):
+ payload = _pack_u16([0, ex_decitenth, 0, 0, 0, meas, ref])
+ return _bin_blob(payload)
+
+
+def _flr_calibration_blob(ex_decitenth, meas_dark, ref_dark, ref_bright):
+ words = [ex_decitenth, 0, 0, 0, 0, meas_dark, 0, ref_dark, ref_bright]
+ return _bin_blob(_pack_u16(words))
+
+
+def _flr_data_blob(ex_decitenth, em_decitenth, meas, ref):
+ words = [0, ex_decitenth, em_decitenth, 0, 0, 0, meas, ref]
+ return _bin_blob(_pack_u16(words))
+
+
+def _lum_data_blob(em_decitenth: int, intensity: int):
+ payload = bytearray(14)
+ payload[0:2] = (0).to_bytes(2, "big")
+ payload[2:4] = int(em_decitenth).to_bytes(2, "big")
+ payload[10:14] = int(intensity).to_bytes(4, "big", signed=True)
+ return _bin_blob(bytes(payload))
+
+
+def _make_test_plate():
+ plate = Plate(
+ "plate",
+ size_x=30,
+ size_y=20,
+ size_z=10,
+ ordered_items=create_ordered_items_2d(
+ Well,
+ num_items_x=3,
+ num_items_y=2,
+ dx=1,
+ dy=2,
+ dz=0,
+ item_dx=10,
+ item_dy=8,
+ size_x=4,
+ size_y=4,
+ size_z=5,
+ ),
+ )
+ plate.location = Coordinate.zero()
+ return plate
+
+
+# ---------------------------------------------------------------------------
+# Protocol tests
+# ---------------------------------------------------------------------------
+
+
+class TestProtocol(unittest.TestCase):
+ def test_frame_command(self):
+ framed = frame_command("A")
+ self.assertEqual(framed, b"\x02A\x03\x00\x00\x01\x40\x0d")
+
+ def test_consume_leading_ascii_frame(self):
+ buffer = bytearray(frame_command("ST") + b"XYZ")
+ consumed, text = _consume_leading_ascii_frame(buffer)
+ self.assertTrue(consumed)
+ self.assertEqual(text, "ST")
+ self.assertEqual(buffer, bytearray(b"XYZ"))
+
+ def test_terminal_frames(self):
+ self.assertTrue(is_terminal_frame("ST"))
+ self.assertTrue(is_terminal_frame("+"))
+ self.assertTrue(is_terminal_frame("-"))
+ self.assertTrue(is_terminal_frame("BY#T5000"))
+ self.assertFalse(is_terminal_frame("OK"))
+
+
+class TestDecoders(unittest.TestCase):
+ def test_absorbance_decoder(self):
+ decoder = _AbsorbanceRunDecoder(1)
+ cal_len, cal_blob = _abs_calibration_blob(6000, 0, 1000, 0, 1000)
+ decoder.feed_bin(cal_len, cal_blob)
+ self.assertIsNotNone(decoder.calibration)
+ data_len, data_blob = _abs_data_blob(6000, 500, 1000)
+ decoder.feed_bin(data_len, data_blob)
+ self.assertTrue(decoder.done)
+ od = _absorbance_od_calibrated(decoder.calibration, [(500, 1000)])
+ self.assertAlmostEqual(od, 0.3010299956639812)
+
+ def test_fluorescence_decoder(self):
+ decoder = _FluorescenceRunDecoder(1)
+ cal_len, cal_blob = _flr_calibration_blob(4850, 0, 0, 1000)
+ decoder.feed_bin(cal_len, cal_blob)
+ data_len, data_blob = _flr_data_blob(4850, 5200, 500, 1000)
+ decoder.feed_bin(data_len, data_blob)
+ self.assertTrue(decoder.done)
+ self.assertEqual(decoder.intensities[0], 500)
+
+ def test_luminescence_decoder(self):
+ decoder = _LuminescenceRunDecoder(1)
+ data_len, data_blob = _lum_data_blob(0, 42)
+ decoder.feed_bin(data_len, data_blob)
+ self.assertTrue(decoder.done)
+ self.assertEqual(decoder.measurements[0].intensity, 42)
+
+
+# ---------------------------------------------------------------------------
+# Driver geometry tests
+# ---------------------------------------------------------------------------
+
+
+class TestDriverGeometry(unittest.TestCase):
+ def setUp(self):
+ self.driver = TecanInfiniteDriver(counts_per_mm_x=1, counts_per_mm_y=1, counts_per_mm_z=1)
+ self.plate = _make_test_plate()
+
+ def test_scan_visit_order_serpentine(self):
+ order = self.driver.scan_visit_order(self.plate.get_all_items(), serpentine=True)
+ identifiers = [well.get_identifier() for well in order]
+ self.assertEqual(identifiers, ["A1", "A2", "A3", "B3", "B2", "B1"])
+
+ def test_scan_visit_order_linear(self):
+ order = self.driver.scan_visit_order(self.plate.get_all_items(), serpentine=False)
+ identifiers = [well.get_identifier() for well in order]
+ self.assertEqual(identifiers, ["A1", "A2", "A3", "B1", "B2", "B3"])
+
+ def test_map_well_to_stage(self):
+ stage_x, stage_y = self.driver.map_well_to_stage(self.plate.get_well("A1"))
+ self.assertEqual((stage_x, stage_y), (3, 8))
+ stage_x, stage_y = self.driver.map_well_to_stage(self.plate.get_well("B1"))
+ self.assertEqual((stage_x, stage_y), (3, 16))
+
+
+# ---------------------------------------------------------------------------
+# Backend integration tests
+# ---------------------------------------------------------------------------
+
+
+class TestAbsorbanceBackend(unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ self.mock_usb = AsyncMock(spec=USB)
+ self.mock_usb.setup = AsyncMock()
+ self.mock_usb.stop = AsyncMock()
+ self.mock_usb.write = AsyncMock()
+ self.mock_usb.read = AsyncMock(return_value=frame_command("ST"))
+ self.plate = _make_test_plate()
+
+ async def test_read_absorbance(self):
+ from pylabrobot.tecan.infinite.absorbance_backend import (
+ TecanInfiniteAbsorbanceBackend,
+ TecanInfiniteAbsorbanceParams,
+ )
+
+ driver = TecanInfiniteDriver(counts_per_mm_x=1000, counts_per_mm_y=1000, io=self.mock_usb)
+ driver._ready = True
+ backend = TecanInfiniteAbsorbanceBackend(driver)
+
+ async def mock_await(decoder, row_count, mode):
+ cal_len, cal_blob = _abs_calibration_blob(6000, 0, 1000, 0, 1000)
+ decoder.feed_bin(cal_len, cal_blob)
+ for _ in range(row_count):
+ data_len, data_blob = _abs_data_blob(6000, 500, 1000)
+ decoder.feed_bin(data_len, data_blob)
+
+ with patch.object(driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(driver, "_await_scan_terminal", new_callable=AsyncMock):
+ results = await backend.read_absorbance(
+ plate=self.plate, wells=[], wavelength=600,
+ backend_params=TecanInfiniteAbsorbanceParams(),
+ )
+
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0].wavelength, 600)
+ self.assertIsNotNone(results[0].data)
+ self.assertAlmostEqual(results[0].data[0][0], 0.3010299956639812)
+
+
+class TestFluorescenceBackend(unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ self.mock_usb = AsyncMock(spec=USB)
+ self.mock_usb.setup = AsyncMock()
+ self.mock_usb.stop = AsyncMock()
+ self.mock_usb.write = AsyncMock()
+ self.mock_usb.read = AsyncMock(return_value=frame_command("ST"))
+ self.plate = _make_test_plate()
+
+ async def test_read_fluorescence(self):
+ from pylabrobot.tecan.infinite.fluorescence_backend import (
+ TecanInfiniteFluorescenceBackend,
+ TecanInfiniteFluorescenceParams,
+ )
+
+ driver = TecanInfiniteDriver(counts_per_mm_x=1000, counts_per_mm_y=1000, io=self.mock_usb)
+ driver._ready = True
+ backend = TecanInfiniteFluorescenceBackend(driver)
+
+ async def mock_await(decoder, row_count, mode):
+ cal_len, cal_blob = _flr_calibration_blob(4850, 0, 0, 1000)
+ decoder.feed_bin(cal_len, cal_blob)
+ for _ in range(row_count):
+ data_len, data_blob = _flr_data_blob(4850, 5200, 500, 1000)
+ decoder.feed_bin(data_len, data_blob)
+
+ with patch.object(driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(driver, "_await_scan_terminal", new_callable=AsyncMock):
+ results = await backend.read_fluorescence(
+ plate=self.plate, wells=[], excitation_wavelength=485,
+ emission_wavelength=520, focal_height=20.0,
+ backend_params=TecanInfiniteFluorescenceParams(),
+ )
+
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0].excitation_wavelength, 485)
+ self.assertEqual(results[0].emission_wavelength, 520)
+
+
+class TestLuminescenceBackend(unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ self.mock_usb = AsyncMock(spec=USB)
+ self.mock_usb.setup = AsyncMock()
+ self.mock_usb.stop = AsyncMock()
+ self.mock_usb.write = AsyncMock()
+ self.mock_usb.read = AsyncMock(return_value=frame_command("ST"))
+ self.plate = _make_test_plate()
+
+ async def test_read_luminescence(self):
+ from pylabrobot.tecan.infinite.luminescence_backend import (
+ TecanInfiniteLuminescenceBackend,
+ TecanInfiniteLuminescenceParams,
+ )
+
+ driver = TecanInfiniteDriver(counts_per_mm_x=1000, counts_per_mm_y=1000, io=self.mock_usb)
+ driver._ready = True
+ backend = TecanInfiniteLuminescenceBackend(driver)
+
+ async def mock_await(decoder, row_count, mode):
+ cal_blob = bytes(14)
+ decoder.feed_bin(10, cal_blob)
+ for _ in range(row_count):
+ data_len, data_blob = _lum_data_blob(0, 1000)
+ decoder.feed_bin(data_len, data_blob)
+
+ with patch.object(driver, "_await_measurements", side_effect=mock_await):
+ with patch.object(driver, "_await_scan_terminal", new_callable=AsyncMock):
+ results = await backend.read_luminescence(
+ plate=self.plate, wells=[], focal_height=14.62,
+ backend_params=TecanInfiniteLuminescenceParams(),
+ )
+
+ self.assertEqual(len(results), 1)
+ self.assertIsNotNone(results[0].data)
diff --git a/pylabrobot/tecan/infinite/loading_tray_backend.py b/pylabrobot/tecan/infinite/loading_tray_backend.py
new file mode 100644
index 00000000000..8ceed2c3c07
--- /dev/null
+++ b/pylabrobot/tecan/infinite/loading_tray_backend.py
@@ -0,0 +1,21 @@
+from typing import Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.loading_tray.backend import LoadingTrayBackend
+
+from .driver import TecanInfiniteDriver
+
+
+class TecanInfiniteLoadingTrayBackend(LoadingTrayBackend):
+ """Loading tray backend for Tecan Infinite plate readers."""
+
+ def __init__(self, driver: TecanInfiniteDriver):
+ self._driver = driver
+
+ async def open(self, backend_params: Optional[BackendParams] = None):
+ await self._driver.send_command("ABSOLUTE MTP,OUT")
+ await self._driver.send_command("BY#T5000")
+
+ async def close(self, backend_params: Optional[BackendParams] = None):
+ await self._driver.send_command("ABSOLUTE MTP,IN")
+ await self._driver.send_command("BY#T5000")
diff --git a/pylabrobot/tecan/infinite/luminescence_backend.py b/pylabrobot/tecan/infinite/luminescence_backend.py
new file mode 100644
index 00000000000..8a11950f5f7
--- /dev/null
+++ b/pylabrobot/tecan/infinite/luminescence_backend.py
@@ -0,0 +1,128 @@
+"""Tecan Infinite 200 PRO luminescence backend."""
+
+from __future__ import annotations
+
+import logging
+import time
+from dataclasses import dataclass
+from typing import List, Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.plate_reading.luminescence.backend import LuminescenceBackend
+from pylabrobot.capabilities.plate_reading.luminescence.standard import LuminescenceResult
+from pylabrobot.resources.plate import Plate
+from pylabrobot.resources.well import Well
+from pylabrobot.serializer import SerializableMixin
+
+from .driver import TecanInfiniteDriver
+from .protocol import (
+ _LuminescenceRunDecoder,
+ _integration_microseconds_to_seconds,
+ format_plate_result,
+)
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TecanInfiniteLuminescenceParams(BackendParams):
+ """Tecan Infinite-specific parameters for luminescence reads.
+
+ Args:
+ flashes: Number of flashes (reads) per well. Default 25.
+ dark_integration_us: Dark integration time in microseconds. Default 3,000,000.
+ meas_integration_us: Measurement integration time in microseconds. Default 1,000,000.
+ """
+
+ flashes: int = 25
+ dark_integration_us: int = 3_000_000
+ meas_integration_us: int = 1_000_000
+
+
+class TecanInfiniteLuminescenceBackend(LuminescenceBackend):
+ """Translates LuminescenceBackend interface into Tecan Infinite driver commands."""
+
+ def __init__(self, driver: TecanInfiniteDriver):
+ self.driver = driver
+
+ async def read_luminescence(
+ self,
+ plate: Plate,
+ wells: List[Well],
+ focal_height: float,
+ backend_params: Optional[SerializableMixin] = None,
+ ) -> List[LuminescenceResult]:
+ if not isinstance(backend_params, TecanInfiniteLuminescenceParams):
+ backend_params = TecanInfiniteLuminescenceParams()
+
+ if focal_height < 0:
+ raise ValueError("Focal height must be non-negative for luminescence scans.")
+
+ ordered_wells = wells if wells else plate.get_all_items()
+ scan_wells = self.driver.scan_visit_order(ordered_wells, serpentine=False)
+
+ dark_integration = backend_params.dark_integration_us
+ meas_integration = backend_params.meas_integration_us
+
+ await self.driver.begin_run()
+ try:
+ await self._configure_luminescence(
+ dark_integration, meas_integration, focal_height, flashes=backend_params.flashes
+ )
+
+ decoder = _LuminescenceRunDecoder(
+ len(scan_wells),
+ dark_integration_s=_integration_microseconds_to_seconds(dark_integration),
+ meas_integration_s=_integration_microseconds_to_seconds(meas_integration),
+ )
+
+ await self.driver.run_scan(
+ ordered_wells=ordered_wells,
+ decoder=decoder,
+ mode="Luminescence",
+ step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK LUM.STEPLOSS"],
+ serpentine=False,
+ scan_direction="UP",
+ )
+
+ if len(decoder.measurements) != len(scan_wells):
+ raise RuntimeError("Luminescence decoder did not complete scan.")
+ intensities = [measurement.intensity for measurement in decoder.measurements]
+ matrix = format_plate_result(plate, scan_wells, intensities)
+ return [
+ LuminescenceResult(
+ data=matrix,
+ temperature=None,
+ timestamp=time.time(),
+ )
+ ]
+ finally:
+ await self.driver.end_run()
+
+ async def _configure_luminescence(
+ self,
+ dark_integration: int,
+ meas_integration: int,
+ focal_height: float,
+ *,
+ flashes: int,
+ ) -> None:
+ await self.driver.send_command("MODE LUM")
+ await self.driver.send_command("CHECK LUM.FIBER")
+ await self.driver.send_command("CHECK LUM.LID")
+ await self.driver.send_command("CHECK LUM.STEPLOSS")
+ await self.driver.send_command("MODE LUM")
+ reads_number = max(1, flashes)
+ z_position = int(round(focal_height * self.driver.counts_per_mm_z))
+ await self.driver.clear_mode_settings(emission=True)
+ await self.driver.send_command(f"POSITION LUM,Z={z_position}", allow_timeout=True)
+ await self.driver.send_command(f"TIME 0,INTEGRATION={dark_integration}", allow_timeout=True)
+ await self.driver.send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command("SCAN DIRECTION=UP", allow_timeout=True)
+ await self.driver.send_command("RATIO LABELS=1", allow_timeout=True)
+ await self.driver.send_command("EMISSION 1,EMPTY,0,0,0", allow_timeout=True)
+ await self.driver.send_command(f"TIME 1,INTEGRATION={meas_integration}", allow_timeout=True)
+ await self.driver.send_command("TIME 1,READDELAY=0", allow_timeout=True)
+ await self.driver.send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True)
+ await self.driver.send_command("#EMISSION ATTENUATION", allow_timeout=True)
+ await self.driver.send_command("PREPARE REF", allow_timeout=True, read_response=False)
diff --git a/pylabrobot/tecan/infinite/protocol.py b/pylabrobot/tecan/infinite/protocol.py
new file mode 100644
index 00000000000..00eddd521e7
--- /dev/null
+++ b/pylabrobot/tecan/infinite/protocol.py
@@ -0,0 +1,612 @@
+"""Tecan Infinite 200 PRO protocol utilities.
+
+Pure functions for framing, stream parsing, binary decoding, and calibration math.
+No I/O -- used by both the driver and capability backends.
+"""
+
+from __future__ import annotations
+
+import math
+import re
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import List, Optional, Sequence, Tuple
+
+from pylabrobot.io.binary import Reader
+from pylabrobot.resources.plate import Plate
+from pylabrobot.resources.well import Well
+
+BIN_RE = re.compile(r"^(\d+),BIN:$")
+
+StagePosition = Tuple[int, int]
+
+
+# ---------------------------------------------------------------------------
+# Framing
+# ---------------------------------------------------------------------------
+
+
+def frame_command(command: str) -> bytes:
+ """Return a framed command with length/checksum trailer."""
+ payload = command.encode("ascii")
+ xor = 0
+ for byte in payload:
+ xor ^= byte
+ checksum = (xor ^ 0x01) & 0xFF
+ length = len(payload) & 0xFF
+ return b"\x02" + payload + b"\x03\x00\x00" + bytes([length, checksum]) + b"\x0d"
+
+
+def is_terminal_frame(text: str) -> bool:
+ """Return True if the ASCII frame is a terminal marker."""
+ return text in {"ST", "+", "-"} or text.startswith("BY#T")
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _integration_microseconds_to_seconds(value: int) -> float:
+ return value / 1_000_000.0
+
+
+def format_plate_result(
+ plate: Plate, wells: Sequence[Well], values: Sequence[float]
+) -> List[List[Optional[float]]]:
+ """Place per-well values into a 2D ``[row][col]`` matrix matching the plate layout."""
+ matrix: List[List[Optional[float]]] = [
+ [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y)
+ ]
+ for well, val in zip(wells, values):
+ r, c = well.get_row(), well.get_column()
+ if 0 <= r < plate.num_items_y and 0 <= c < plate.num_items_x:
+ matrix[r][c] = float(val)
+ return matrix
+
+
+def _split_payload_and_trailer(
+ payload_len: int, blob: bytes
+) -> Optional[Tuple[bytes, Tuple[int, int]]]:
+ if len(blob) != payload_len + 4:
+ return None
+ payload = blob[:payload_len]
+ trailer_reader = Reader(blob[payload_len:], little_endian=False)
+ return payload, (trailer_reader.u16(), trailer_reader.u16())
+
+
+# ---------------------------------------------------------------------------
+# Stream parsing
+# ---------------------------------------------------------------------------
+
+
+def _consume_leading_ascii_frame(buffer: bytearray) -> Tuple[bool, Optional[str]]:
+ """Remove a leading STX...ETX ASCII frame if present."""
+ if not buffer or buffer[0] != 0x02:
+ return False, None
+ end = buffer.find(b"\x03", 1)
+ if end == -1:
+ return False, None
+ if len(buffer) < end + 5:
+ return False, None
+ text = buffer[1:end].decode("ascii", "ignore")
+ del buffer[: end + 5]
+ if buffer and buffer[0] == 0x0D:
+ del buffer[0]
+ return True, text
+
+
+def _consume_status_frame(buffer: bytearray, length: int) -> bool:
+ """Drop a leading ESC-prefixed status frame if present."""
+ if len(buffer) >= length and buffer[0] == 0x1B:
+ del buffer[:length]
+ return True
+ return False
+
+
+@dataclass
+class _StreamEvent:
+ """Parsed stream event (ASCII or binary)."""
+ text: Optional[str] = None
+ payload_len: Optional[int] = None
+ blob: Optional[bytes] = None
+
+
+class _StreamParser:
+ """Parse mixed ASCII and binary packets from the reader."""
+
+ def __init__(
+ self,
+ *,
+ status_frame_len: Optional[int] = None,
+ allow_bare_ascii: bool = False,
+ ) -> None:
+ self._buffer = bytearray()
+ self._pending_bin: Optional[int] = None
+ self._status_frame_len = status_frame_len
+ self._allow_bare_ascii = allow_bare_ascii
+
+ def has_pending_bin(self) -> bool:
+ """Return True if a binary payload length is pending."""
+ return self._pending_bin is not None
+
+ def feed(self, chunk: bytes) -> List[_StreamEvent]:
+ """Feed raw bytes and return newly parsed events."""
+ self._buffer.extend(chunk)
+ events: List[_StreamEvent] = []
+ progressed = True
+ while progressed:
+ progressed = False
+ if self._pending_bin is not None:
+ need = self._pending_bin + 4
+ if len(self._buffer) < need:
+ break
+ blob = bytes(self._buffer[:need])
+ del self._buffer[:need]
+ events.append(_StreamEvent(payload_len=self._pending_bin, blob=blob))
+ self._pending_bin = None
+ progressed = True
+ continue
+ if self._status_frame_len and _consume_status_frame(self._buffer, self._status_frame_len):
+ progressed = True
+ continue
+ consumed, text = _consume_leading_ascii_frame(self._buffer)
+ if consumed:
+ events.append(_StreamEvent(text=text))
+ if text:
+ m = BIN_RE.match(text)
+ if m:
+ self._pending_bin = int(m.group(1))
+ progressed = True
+ continue
+ if self._allow_bare_ascii and self._buffer and all(32 <= b <= 126 for b in self._buffer):
+ text = self._buffer.decode("ascii", "ignore")
+ self._buffer.clear()
+ events.append(_StreamEvent(text=text))
+ progressed = True
+ continue
+ return events
+
+
+# ---------------------------------------------------------------------------
+# Measurement decoder base
+# ---------------------------------------------------------------------------
+
+
+class _MeasurementDecoder(ABC):
+ """Shared incremental decoder for Infinite measurement streams."""
+
+ STATUS_FRAME_LEN: Optional[int] = None
+
+ def __init__(self, expected: int) -> None:
+ self.expected = expected
+ self._terminal_seen = False
+ self._parser = _StreamParser(status_frame_len=self.STATUS_FRAME_LEN)
+
+ @property
+ @abstractmethod
+ def count(self) -> int:
+ """Return number of decoded measurements so far."""
+
+ @property
+ def done(self) -> bool:
+ return self.count >= self.expected
+
+ def pop_terminal(self) -> bool:
+ seen = self._terminal_seen
+ self._terminal_seen = False
+ return seen
+
+ def feed(self, chunk: bytes) -> None:
+ for event in self._parser.feed(chunk):
+ if event.text is not None:
+ if event.text == "ST":
+ self._terminal_seen = True
+ elif event.payload_len is not None and event.blob is not None:
+ self.feed_bin(event.payload_len, event.blob)
+
+ def feed_bin(self, payload_len: int, blob: bytes) -> None:
+ if self._should_consume_bin(payload_len):
+ self._handle_bin(payload_len, blob)
+
+ def _should_consume_bin(self, _payload_len: int) -> bool:
+ return False
+
+ def _handle_bin(self, _payload_len: int, _blob: bytes) -> None:
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Absorbance decoding & calibration
+# ---------------------------------------------------------------------------
+
+
+def _is_abs_calibration_len(payload_len: int) -> bool:
+ return payload_len >= 22 and (payload_len - 4) % 18 == 0
+
+
+def _is_abs_data_len(payload_len: int) -> bool:
+ return payload_len >= 14 and (payload_len - 4) % 10 == 0
+
+
+@dataclass(frozen=True)
+class _AbsorbanceCalibrationItem:
+ ticker_overflows: int
+ ticker_counter: int
+ meas_gain: int
+ meas_dark: int
+ meas_bright: int
+ ref_gain: int
+ ref_dark: int
+ ref_bright: int
+
+
+@dataclass(frozen=True)
+class _AbsorbanceCalibration:
+ ex: int
+ items: List[_AbsorbanceCalibrationItem]
+
+
+def _decode_abs_calibration(payload_len: int, blob: bytes) -> Optional[_AbsorbanceCalibration]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) < 4 + 18:
+ return None
+ if (len(payload) - 4) % 18 != 0:
+ return None
+ reader = Reader(payload, little_endian=False)
+ reader.raw_bytes(2)
+ ex = reader.u16()
+ items: List[_AbsorbanceCalibrationItem] = []
+ while reader.has_remaining():
+ items.append(
+ _AbsorbanceCalibrationItem(
+ ticker_overflows=reader.u32(),
+ ticker_counter=reader.u16(),
+ meas_gain=reader.u16(),
+ meas_dark=reader.u16(),
+ meas_bright=reader.u16(),
+ ref_gain=reader.u16(),
+ ref_dark=reader.u16(),
+ ref_bright=reader.u16(),
+ )
+ )
+ return _AbsorbanceCalibration(ex=ex, items=items)
+
+
+def _decode_abs_data(
+ payload_len: int, blob: bytes
+) -> Optional[Tuple[int, int, List[Tuple[int, int]]]]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) < 4:
+ return None
+ reader = Reader(payload, little_endian=False)
+ label = reader.u16()
+ ex = reader.u16()
+ items: List[Tuple[int, int]] = []
+ while reader.offset() + 10 <= len(payload):
+ reader.raw_bytes(6)
+ meas = reader.u16()
+ ref = reader.u16()
+ items.append((meas, ref))
+ if reader.offset() != len(payload):
+ return None
+ return label, ex, items
+
+
+def _absorbance_od_calibrated(
+ cal: _AbsorbanceCalibration, meas_ref_items: List[Tuple[int, int]], od_max: float = 4.0
+) -> float:
+ if not cal.items:
+ raise ValueError("ABS calibration packet contained no calibration items.")
+
+ min_corr_trans = math.pow(10.0, -od_max)
+
+ if len(cal.items) == len(meas_ref_items) and len(cal.items) > 1:
+ corr_trans_vals: List[float] = []
+ for (meas, ref), cal_item in zip(meas_ref_items, cal.items):
+ denom_corr = cal_item.meas_bright - cal_item.meas_dark
+ if denom_corr == 0:
+ continue
+ f_corr = (cal_item.ref_bright - cal_item.ref_dark) / denom_corr
+ denom = ref - cal_item.ref_dark
+ if denom == 0:
+ continue
+ corr_trans_vals.append(((meas - cal_item.meas_dark) / denom) * f_corr)
+ if not corr_trans_vals:
+ raise ZeroDivisionError("ABS invalid: no usable reads after per-read calibration.")
+ corr_trans = max(sum(corr_trans_vals) / len(corr_trans_vals), min_corr_trans)
+ return float(-math.log10(corr_trans))
+
+ cal0 = cal.items[0]
+ denom_corr = cal0.meas_bright - cal0.meas_dark
+ if denom_corr == 0:
+ raise ZeroDivisionError("ABS calibration invalid: meas_bright == meas_dark")
+ f_corr = (cal0.ref_bright - cal0.ref_dark) / denom_corr
+
+ trans_vals: List[float] = []
+ for meas, ref in meas_ref_items:
+ denom = ref - cal0.ref_dark
+ if denom == 0:
+ continue
+ trans_vals.append((meas - cal0.meas_dark) / denom)
+ if not trans_vals:
+ raise ZeroDivisionError("ABS invalid: all ref reads equal ref_dark")
+
+ trans_mean = sum(trans_vals) / len(trans_vals)
+ corr_trans = max(trans_mean * f_corr, min_corr_trans)
+ return float(-math.log10(corr_trans))
+
+
+@dataclass
+class _AbsorbanceMeasurement:
+ sample: int
+ reference: int
+ items: Optional[List[Tuple[int, int]]] = None
+
+
+class _AbsorbanceRunDecoder(_MeasurementDecoder):
+ """Incrementally decode absorbance measurement frames."""
+
+ STATUS_FRAME_LEN = 31
+
+ def __init__(self, expected: int) -> None:
+ super().__init__(expected)
+ self.measurements: List[_AbsorbanceMeasurement] = []
+ self._calibration: Optional[_AbsorbanceCalibration] = None
+
+ @property
+ def count(self) -> int:
+ return len(self.measurements)
+
+ @property
+ def calibration(self) -> Optional[_AbsorbanceCalibration]:
+ return self._calibration
+
+ def _should_consume_bin(self, payload_len: int) -> bool:
+ return _is_abs_calibration_len(payload_len) or _is_abs_data_len(payload_len)
+
+ def _handle_bin(self, payload_len: int, blob: bytes) -> None:
+ if _is_abs_calibration_len(payload_len):
+ if self._calibration is not None:
+ return
+ cal = _decode_abs_calibration(payload_len, blob)
+ if cal is not None:
+ self._calibration = cal
+ return
+ if _is_abs_data_len(payload_len):
+ data = _decode_abs_data(payload_len, blob)
+ if data is None:
+ return
+ _label, _ex, items = data
+ sample, reference = items[0] if items else (0, 0)
+ self.measurements.append(
+ _AbsorbanceMeasurement(sample=sample, reference=reference, items=items)
+ )
+
+
+# ---------------------------------------------------------------------------
+# Fluorescence decoding & calibration
+# ---------------------------------------------------------------------------
+
+
+@dataclass(frozen=True)
+class _FluorescenceCalibration:
+ ex: int
+ meas_dark: int
+ ref_dark: int
+ ref_bright: int
+
+
+def _decode_flr_calibration(payload_len: int, blob: bytes) -> Optional[_FluorescenceCalibration]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) != 18:
+ return None
+ reader = Reader(payload, little_endian=False)
+ ex = reader.u16()
+ reader.raw_bytes(8)
+ meas_dark = reader.u16()
+ reader.raw_bytes(2)
+ ref_dark = reader.u16()
+ ref_bright = reader.u16()
+ return _FluorescenceCalibration(
+ ex=ex, meas_dark=meas_dark, ref_dark=ref_dark, ref_bright=ref_bright,
+ )
+
+
+def _decode_flr_data(
+ payload_len: int, blob: bytes
+) -> Optional[Tuple[int, int, int, List[Tuple[int, int]]]]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) < 6:
+ return None
+ reader = Reader(payload, little_endian=False)
+ label = reader.u16()
+ ex = reader.u16()
+ em = reader.u16()
+ items: List[Tuple[int, int]] = []
+ while reader.offset() + 10 <= len(payload):
+ reader.raw_bytes(6)
+ meas = reader.u16()
+ ref = reader.u16()
+ items.append((meas, ref))
+ if reader.offset() != len(payload):
+ return None
+ return label, ex, em, items
+
+
+def _fluorescence_corrected(
+ cal: _FluorescenceCalibration, meas_ref_items: List[Tuple[int, int]]
+) -> int:
+ if not meas_ref_items:
+ return 0
+ meas_mean = sum(m for m, _ in meas_ref_items) / len(meas_ref_items)
+ ref_mean = sum(r for _, r in meas_ref_items) / len(meas_ref_items)
+ denom = ref_mean - cal.ref_dark
+ if denom == 0:
+ return 0
+ corr = (meas_mean - cal.meas_dark) * (cal.ref_bright - cal.ref_dark) / denom
+ return int(round(corr))
+
+
+class _FluorescenceRunDecoder(_MeasurementDecoder):
+ """Incrementally decode fluorescence measurement frames."""
+
+ STATUS_FRAME_LEN = 31
+
+ def __init__(self, expected_wells: int) -> None:
+ super().__init__(expected_wells)
+ self._intensities: List[int] = []
+ self._calibration: Optional[_FluorescenceCalibration] = None
+
+ @property
+ def count(self) -> int:
+ return len(self._intensities)
+
+ @property
+ def intensities(self) -> List[int]:
+ return self._intensities
+
+ def _should_consume_bin(self, payload_len: int) -> bool:
+ if payload_len == 18:
+ return True
+ if payload_len >= 16 and (payload_len - 6) % 10 == 0:
+ return True
+ return False
+
+ def _handle_bin(self, payload_len: int, blob: bytes) -> None:
+ if payload_len == 18:
+ cal = _decode_flr_calibration(payload_len, blob)
+ if cal is not None:
+ self._calibration = cal
+ return
+ data = _decode_flr_data(payload_len, blob)
+ if data is None:
+ return
+ _label, _ex, _em, items = data
+ if self._calibration is not None:
+ intensity = _fluorescence_corrected(self._calibration, items)
+ else:
+ if not items:
+ intensity = 0
+ else:
+ intensity = int(round(sum(m for m, _ in items) / len(items)))
+ self._intensities.append(intensity)
+
+
+# ---------------------------------------------------------------------------
+# Luminescence decoding & calibration
+# ---------------------------------------------------------------------------
+
+
+@dataclass(frozen=True)
+class _LuminescenceCalibration:
+ ref_dark: int
+
+
+def _decode_lum_calibration(payload_len: int, blob: bytes) -> Optional[_LuminescenceCalibration]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) != 10:
+ return None
+ reader = Reader(payload, little_endian=False)
+ reader.raw_bytes(6)
+ return _LuminescenceCalibration(ref_dark=reader.i32())
+
+
+def _decode_lum_data(payload_len: int, blob: bytes) -> Optional[Tuple[int, int, List[int]]]:
+ split = _split_payload_and_trailer(payload_len, blob)
+ if split is None:
+ return None
+ payload, _ = split
+ if len(payload) < 4:
+ return None
+ reader = Reader(payload, little_endian=False)
+ label = reader.u16()
+ em = reader.u16()
+ counts: List[int] = []
+ while reader.offset() + 10 <= len(payload):
+ reader.raw_bytes(6)
+ counts.append(reader.i32())
+ if reader.offset() != len(payload):
+ return None
+ return label, em, counts
+
+
+def _luminescence_intensity(
+ cal: _LuminescenceCalibration,
+ counts: List[int],
+ dark_integration_s: float,
+ meas_integration_s: float,
+) -> int:
+ if not counts:
+ return 0
+ if dark_integration_s == 0 or meas_integration_s == 0:
+ return 0
+ count_mean = sum(counts) / len(counts)
+ corrected_rate = (count_mean / meas_integration_s) - (cal.ref_dark / dark_integration_s)
+ return int(corrected_rate)
+
+
+@dataclass
+class _LuminescenceMeasurement:
+ intensity: int
+
+
+class _LuminescenceRunDecoder(_MeasurementDecoder):
+ """Incrementally decode luminescence measurement frames."""
+
+ def __init__(
+ self,
+ expected: int,
+ *,
+ dark_integration_s: float = 0.0,
+ meas_integration_s: float = 0.0,
+ ) -> None:
+ super().__init__(expected)
+ self.measurements: List[_LuminescenceMeasurement] = []
+ self._calibration: Optional[_LuminescenceCalibration] = None
+ self._dark_integration_s = float(dark_integration_s)
+ self._meas_integration_s = float(meas_integration_s)
+
+ @property
+ def count(self) -> int:
+ return len(self.measurements)
+
+ def _should_consume_bin(self, payload_len: int) -> bool:
+ if payload_len == 10:
+ return True
+ if payload_len >= 14 and (payload_len - 4) % 10 == 0:
+ return True
+ return False
+
+ def _handle_bin(self, payload_len: int, blob: bytes) -> None:
+ if payload_len == 10:
+ cal = _decode_lum_calibration(payload_len, blob)
+ if cal is not None:
+ self._calibration = cal
+ return
+ data = _decode_lum_data(payload_len, blob)
+ if data is None:
+ return
+ _label, _em, counts = data
+ if self._calibration is not None and self._dark_integration_s and self._meas_integration_s:
+ intensity = _luminescence_intensity(
+ self._calibration, counts, self._dark_integration_s, self._meas_integration_s
+ )
+ else:
+ intensity = int(round(sum(counts) / len(counts))) if counts else 0
+ self.measurements.append(_LuminescenceMeasurement(intensity=intensity))