diff --git a/pylabrobot/plate_reading/__init__.py b/pylabrobot/plate_reading/__init__.py index 81cd7d1f03b..58e3f7fa164 100644 --- a/pylabrobot/plate_reading/__init__.py +++ b/pylabrobot/plate_reading/__init__.py @@ -1,15 +1,40 @@ -from .agilent_biotek_backend import BioTekPlateReaderBackend -from .agilent_biotek_cytation_backend import ( +from __future__ import annotations + +from typing import Any + +from .agilent import ( + BioTekPlateReaderBackend, Cytation5Backend, Cytation5ImagingConfig, CytationBackend, CytationImagingConfig, + SynergyH1Backend, ) -from .agilent_biotek_synergyh1_backend import SynergyH1Backend +from .bmg_labtech import CLARIOstarBackend from .chatterbox import PlateReaderChatterboxBackend -from .clario_star_backend import CLARIOstarBackend from .image_reader import ImageReader from .imager import Imager +from .molecular_devices import ( + Calibrate, + CarriageSpeed, + KineticSettings, + MolecularDevicesBackend, + MolecularDevicesError, + MolecularDevicesFirmwareError, + MolecularDevicesHardwareError, + MolecularDevicesMotionError, + MolecularDevicesNVRAMError, + MolecularDevicesSettings, + MolecularDevicesSpectraMax384PlusBackend, + MolecularDevicesSpectraMaxM5Backend, + MolecularDevicesUnrecognizedCommandError, + PmtGain, + ReadMode, + ReadOrder, + ReadType, + ShakeSettings, + SpectrumSettings, +) from .plate_reader import PlateReader from .standard import ( Exposure, diff --git a/pylabrobot/plate_reading/agilent/__init__.py b/pylabrobot/plate_reading/agilent/__init__.py new file mode 100644 index 00000000000..59e960a561c --- /dev/null +++ b/pylabrobot/plate_reading/agilent/__init__.py @@ -0,0 +1,8 @@ +from .biotek_backend import BioTekPlateReaderBackend +from .biotek_cytation_backend import ( + Cytation5Backend, + Cytation5ImagingConfig, + CytationBackend, + CytationImagingConfig, +) +from .biotek_synergyh1_backend import SynergyH1Backend diff --git a/pylabrobot/plate_reading/agilent_biotek_backend.py b/pylabrobot/plate_reading/agilent/biotek_backend.py similarity index 100% rename from pylabrobot/plate_reading/agilent_biotek_backend.py rename to pylabrobot/plate_reading/agilent/biotek_backend.py diff --git a/pylabrobot/plate_reading/agilent_biotek_cytation_backend.py b/pylabrobot/plate_reading/agilent/biotek_cytation_backend.py similarity index 99% rename from pylabrobot/plate_reading/agilent_biotek_cytation_backend.py rename to pylabrobot/plate_reading/agilent/biotek_cytation_backend.py index 962828e18ee..5844360a5dc 100644 --- a/pylabrobot/plate_reading/agilent_biotek_cytation_backend.py +++ b/pylabrobot/plate_reading/agilent/biotek_cytation_backend.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from typing import List, Literal, Optional, Tuple, Union -from pylabrobot.plate_reading.agilent_biotek_backend import BioTekPlateReaderBackend +from pylabrobot.plate_reading.agilent.biotek_backend import BioTekPlateReaderBackend from pylabrobot.plate_reading.backend import ImagerBackend from pylabrobot.resources import Plate diff --git a/pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py b/pylabrobot/plate_reading/agilent/biotek_synergyh1_backend.py similarity index 96% rename from pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py rename to pylabrobot/plate_reading/agilent/biotek_synergyh1_backend.py index d6d76065e28..20aba4545cb 100644 --- a/pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py +++ b/pylabrobot/plate_reading/agilent/biotek_synergyh1_backend.py @@ -5,7 +5,7 @@ from pylibftdi import FtdiError -from pylabrobot.plate_reading.agilent_biotek_backend import BioTekPlateReaderBackend +from pylabrobot.plate_reading.agilent.biotek_backend import BioTekPlateReaderBackend logger = logging.getLogger(__name__) diff --git a/pylabrobot/plate_reading/biotek_tests.py b/pylabrobot/plate_reading/agilent/biotek_tests.py similarity index 99% rename from pylabrobot/plate_reading/biotek_tests.py rename to pylabrobot/plate_reading/agilent/biotek_tests.py index 7e5aeaef2f0..98f5d145b95 100644 --- a/pylabrobot/plate_reading/biotek_tests.py +++ b/pylabrobot/plate_reading/agilent/biotek_tests.py @@ -5,7 +5,7 @@ import unittest.mock from typing import Iterator -from pylabrobot.plate_reading.agilent_biotek_cytation_backend import CytationBackend +from pylabrobot.plate_reading.agilent.biotek_cytation_backend import CytationBackend from pylabrobot.resources import CellVis_24_wellplate_3600uL_Fb, CellVis_96_wellplate_350uL_Fb diff --git a/pylabrobot/plate_reading/biotek_backend.py b/pylabrobot/plate_reading/biotek_backend.py new file mode 100644 index 00000000000..c3e71dcba25 --- /dev/null +++ b/pylabrobot/plate_reading/biotek_backend.py @@ -0,0 +1,8 @@ +import warnings + +from .agilent.biotek_backend import BioTekPlateReaderBackend # noqa: F401 + +warnings.warn( + "pylabrobot.plate_reading.biotek_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.agilent.biotek_backend instead.", +) diff --git a/pylabrobot/plate_reading/biotek_cytation_backend.py b/pylabrobot/plate_reading/biotek_cytation_backend.py new file mode 100644 index 00000000000..c89a11cbf88 --- /dev/null +++ b/pylabrobot/plate_reading/biotek_cytation_backend.py @@ -0,0 +1,13 @@ +import warnings + +from .agilent.biotek_cytation_backend import ( + Cytation5Backend, # noqa: F401 + Cytation5ImagingConfig, # noqa: F401 + CytationBackend, # noqa: F401 + CytationImagingConfig, # noqa: F401 +) + +warnings.warn( + "pylabrobot.plate_reading.biotek_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.agilent.biotek_backend instead.", +) diff --git a/pylabrobot/plate_reading/biotek_synergyh1_backend.py b/pylabrobot/plate_reading/biotek_synergyh1_backend.py new file mode 100644 index 00000000000..8f6589ab25d --- /dev/null +++ b/pylabrobot/plate_reading/biotek_synergyh1_backend.py @@ -0,0 +1,8 @@ +import warnings + +from .agilent.biotek_synergyh1_backend import SynergyH1Backend # noqa: F401 + +warnings.warn( + "pylabrobot.plate_reading.biotek_synergyh1_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.agilent.biotek_synergyh1_backend instead.", +) diff --git a/pylabrobot/plate_reading/bmg_labtech/__init__.py b/pylabrobot/plate_reading/bmg_labtech/__init__.py new file mode 100644 index 00000000000..fe8e943fedb --- /dev/null +++ b/pylabrobot/plate_reading/bmg_labtech/__init__.py @@ -0,0 +1 @@ +from .clario_star_backend import CLARIOstarBackend diff --git a/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py b/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py new file mode 100644 index 00000000000..bdbf188a303 --- /dev/null +++ b/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py @@ -0,0 +1,405 @@ +import asyncio +import logging +import math +import struct +import sys +import time +from typing import Dict, List, Optional, Tuple, Union + +from pylabrobot.resources.well import Well + +try: + from pylibftdi import driver + + HAS_PYLIBFTDI = True +except ImportError as e: + HAS_PYLIBFTDI = False + _FTDI_IMPORT_ERROR = e + +from pylabrobot import utils +from pylabrobot.io.ftdi import FTDI +from pylabrobot.resources.plate import Plate + +from ..backend import PlateReaderBackend + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal + +logger = logging.getLogger("pylabrobot") + +# Make pylibftdi scan the CLARIOstar VID:PID +# appears as ID 0403:bb68 Future Technology Devices International Limited CLARIOstar + +if HAS_PYLIBFTDI: + driver.USB_VID_LIST.append(0x0403) # i.e. 1027 + driver.USB_PID_LIST.append(0xBB68) # i.e. 47976 + + +class CLARIOstarBackend(PlateReaderBackend): + """A plate reader backend for the Clario star. Note that this is not a complete implementation + and many commands and parameters are not implemented yet.""" + + def __init__(self, device_id: Optional[str] = None): + self.io = FTDI(device_id=device_id) + + async def setup(self): + await self.io.setup() + await self.io.set_baudrate(125000) + await self.io.set_line_property(8, 0, 0) # 8N1 + await self.io.set_latency_timer(2) + + await self.initialize() + await self.request_eeprom_data() + + async def stop(self): + await self.io.stop() + + async def get_stat(self): + stat = await self.io.poll_modem_status() + return hex(stat) + + async def read_resp(self, timeout=20) -> bytes: + """Read a response from the plate reader. If the timeout is reached, return the data that has + been read so far.""" + + d = b"" + last_read = b"" + end_byte_found = False + t = time.time() + + # Commands are terminated with 0x0d, but this value may also occur as a part of the response. + # Therefore, we read until we read a 0x0d, but if that's the last byte we read in a full packet, + # we keep reading for at least one more cycle. We only check the timeout if the last read was + # unsuccessful (i.e. keep reading if we are still getting data). + while True: + last_read = await self.io.read(25) # 25 is max length observed in pcap + if len(last_read) > 0: + d += last_read + end_byte_found = d[-1] == 0x0D + if ( + len(last_read) < 25 and end_byte_found + ): # if we read less than 25 bytes, we're at the end + break + else: + # If we didn't read any data, check if the last read ended in an end byte. If so, we're done + if end_byte_found: + break + + # Check if we've timed out. + if time.time() - t > timeout: + logger.warning("timed out reading response") + break + + # If we read data, we don't wait and immediately try to read more. + await asyncio.sleep(0.0001) + + logger.debug("read %s", d.hex()) + + return d + + async def send(self, cmd: Union[bytearray, bytes], read_timeout=20): + """Send a command to the plate reader and return the response.""" + + checksum = (sum(cmd) & 0xFFFF).to_bytes(2, byteorder="big") + cmd = cmd + checksum + b"\x0d" + + logger.debug("sending %s", cmd.hex()) + + w = await self.io.write(cmd) + + logger.debug("wrote %s bytes", w) + + assert w == len(cmd) + + resp = await self.read_resp(timeout=read_timeout) + return resp + + async def _wait_for_ready_and_return(self, ret, timeout=150): + """Wait for the plate reader to be ready and return the response.""" + last_status = None + t = time.time() + while time.time() - t < timeout: + await asyncio.sleep(0.1) + + command_status = await self.read_command_status() + + if len(command_status) != 24: + logger.warning( + "unexpected response %s. I think a command status response is always 24 bytes", + command_status, + ) + continue + + if command_status != last_status: + logger.info("status changed %s", command_status.hex()) + last_status = command_status + else: + continue + + if command_status[2] != 0x18 or command_status[3] != 0x0C or command_status[4] != 0x01: + logger.warning( + "unexpected response %s. I think 18 0c 01 indicates a command status response", + command_status, + ) + + if command_status[5] not in { + 0x25, + 0x05, + }: # 25 is busy, 05 is ready. probably. + logger.warning("unexpected response %s.", command_status) + + if command_status[5] == 0x05: + logger.debug("status is ready") + return ret + + async def read_command_status(self): + status = await self.send(b"\x02\x00\x09\x0c\x80\x00") + return status + + async def initialize(self): + command_response = await self.send(b"\x02\x00\x0d\x0c\x01\x00\x00\x10\x02\x00") + return await self._wait_for_ready_and_return(command_response) + + async def request_eeprom_data(self): + eeprom_response = await self.send(b"\x02\x00\x0f\x0c\x05\x07\x00\x00\x00\x00\x00\x00") + return await self._wait_for_ready_and_return(eeprom_response) + + async def open(self): + open_response = await self.send(b"\x02\x00\x0e\x0c\x03\x01\x00\x00\x00\x00\x00") + return await self._wait_for_ready_and_return(open_response) + + async def close(self, plate: Optional[Plate] = None): + close_response = await self.send(b"\x02\x00\x0e\x0c\x03\x00\x00\x00\x00\x00\x00") + return await self._wait_for_ready_and_return(close_response) + + async def _mp_and_focus_height_value(self): + mp_and_focus_height_value_response = await self.send( + b"\x02\x00\x0f\x0c\x05\17\x00\x00\x00\x00" + b"\x00\x00" + ) + return await self._wait_for_ready_and_return(mp_and_focus_height_value_response) + + async def _run_luminescence(self, focal_height: float): + """Run a plate reader luminescence run.""" + + assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm" + + focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big") + + run_response = await self.send( + b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56" + b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" + b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00" + b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01" + b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00" + ) + + # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. + last_status = None + while True: + await asyncio.sleep(0.1) + + command_status = await self.read_command_status() + + if command_status != last_status: + last_status = command_status + logger.info("status changed %s", command_status) + continue + + if command_status == bytes( + b"\x02\x00\x18\x0c\x01\x25\x04\x2e\x00\x00\x04\x01\x00\x00\x03\x00" + b"\x00\x00\x00\xc0\x00\x01\x46\x0d" + ): + return run_response + + async def _run_absorbance(self, wavelength: float): + """Run a plate reader absorbance run.""" + wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big") + + absorbance_command = ( + b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06" + b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" + b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00" + b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00" + ) + run_response = await self.send(absorbance_command) + + # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. + last_status = None + while True: + await asyncio.sleep(0.1) + + command_status = await self.read_command_status() + + if command_status != last_status: + last_status = command_status + logger.info("status changed %s", command_status) + continue + + if command_status == bytes( + b"\x02\x00\x18\x0c\x01\x25\x04\x2e\x00\x00\x04\x01\x00\x00\x03\x00" + b"\x00\x00\x00\xc0\x00\x01\x46\x0d" + ): + return run_response + + async def _read_order_values(self): + return await self.send(b"\x02\x00\x0f\x0c\x05\x1d\x00\x00\x00\x00\x00\x00") + + async def _status_hw(self): + status_hw_response = await self.send(b"\x02\x00\x09\x0c\x81\x00") + return await self._wait_for_ready_and_return(status_hw_response) + + async def _get_measurement_values(self): + return await self.send(b"\x02\x00\x0f\x0c\x05\x02\x00\x00\x00\x00\x00\x00") + + async def read_luminescence( + self, plate: Plate, wells: List[Well], focal_height: float = 13 + ) -> List[Dict]: + """Read luminescence values from the plate reader.""" + if wells != plate.get_all_items(): + raise NotImplementedError("Only full plate reads are supported for now.") + + await self._mp_and_focus_height_value() + + await self._run_luminescence(focal_height=focal_height) + + await self._read_order_values() + + await self._status_hw() + + vals = await self._get_measurement_values() + + # All 96 values are 32 bit integers. The header is variable length, so we need to find the + # start of the data. In the future, when we understand the protocol better, this can be + # replaced with a more robust solution. + start_idx = vals.index(b"\x00\x00\x00\x00\x00\x00") + len(b"\x00\x00\x00\x00\x00\x00") + data = list(vals)[start_idx : start_idx + 96 * 4] + + # group bytes by 4 + int_bytes = [data[i : i + 4] for i in range(0, len(data), 4)] + + # convert to int + ints = [struct.unpack(">i", bytes(int_data))[0] for int_data in int_bytes] + + # for backend conformity, convert to float, and reshape to 2d array + floats: List[List[Optional[float]]] = utils.reshape_2d( + [float(i) for i in ints], (plate.num_items_y, plate.num_items_x) + ) + + return [ + { + "data": floats, + "temperature": float("nan"), # Temperature not available + "time": time.time(), + } + ] + + async def read_absorbance( + self, + plate: Plate, + wells: List[Well], + wavelength: int, + report: Literal["OD", "transmittance"] = "OD", + ) -> List[Dict]: + """Read absorbance values from the device. + + Args: + wavelength: wavelength to read absorbance at, in nanometers. + report: whether to report absorbance as optical depth (OD) or transmittance. Transmittance is + used interchangeably with "transmission" in the CLARIOStar software and documentation. + + Returns: + A list containing a single dictionary, where the key is (wavelength, 0) and the value is + another dictionary containing the data, temperature, and time. + """ + + if wells != plate.get_all_items(): + raise NotImplementedError("Only full plate reads are supported for now.") + + await self._mp_and_focus_height_value() + + await self._run_absorbance(wavelength=wavelength) + + await self._read_order_values() + + await self._status_hw() + + vals = await self._get_measurement_values() + div = b"\x00" * 6 + start_idx = vals.index(div) + len(div) + chromatic_data = vals[start_idx : start_idx + 96 * 4] + ref_data = vals[start_idx + 96 * 4 : start_idx + (96 * 2) * 4] + chromatic_bytes = [bytes(chromatic_data[i : i + 4]) for i in range(0, len(chromatic_data), 4)] + ref_bytes = [bytes(ref_data[i : i + 4]) for i in range(0, len(ref_data), 4)] + chromatic_reading = [struct.unpack(">i", x)[0] for x in chromatic_bytes] + reference_reading = [struct.unpack(">i", x)[0] for x in ref_bytes] + + # c100 is the value of the chromatic at 100% intensity + # c0 is the value of the chromatic at 0% intensity (black reading) + # r100 is the value of the reference at 100% intensity + # r0 is the value of the reference at 0% intensity (black reading) + after_values_idx = start_idx + (96 * 2) * 4 + c100, c0, r100, r0 = struct.unpack(">iiii", vals[after_values_idx : after_values_idx + 4 * 4]) + + # a bit much, but numpy should not be a dependency + real_chromatic_reading = [] + for cr in chromatic_reading: + real_chromatic_reading.append((cr - c0) / c100) + real_reference_reading = [] + for rr in reference_reading: + real_reference_reading.append((rr - r0) / r100) + + transmittance: List[Optional[float]] = [] + for rcr, rrr in zip(real_chromatic_reading, real_reference_reading): + transmittance.append(rcr / rrr * 100) + + data: List[List[Optional[float]]] + if report == "OD": + od: List[Optional[float]] = [] + for t in transmittance: + od.append(math.log10(100 / t) if t is not None and t > 0 else None) + data = utils.reshape_2d(od, (plate.num_items_y, plate.num_items_x)) + elif report == "transmittance": + data = utils.reshape_2d(transmittance, (plate.num_items_y, plate.num_items_x)) + else: + raise ValueError(f"Invalid report type: {report}") + + return [ + { + "wavelength": wavelength, + "data": data, + "temperature": float("nan"), # Temperature not available + "time": time.time(), + } + ] + + async def read_fluorescence( + self, + plate: Plate, + wells: List[Well], + excitation_wavelength: int, + emission_wavelength: int, + focal_height: float, + ) -> List[Dict[Tuple[int, int], Dict]]: + raise NotImplementedError("Not implemented yet") + + +# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) +# https://github.com/PyLabRobot/pylabrobot/issues/466 + + +class CLARIOStar: + def __init__(self, *args, **kwargs): + raise RuntimeError("`CLARIOStar` is deprecated. Please use `CLARIOStarBackend` instead.") + + +class CLARIOStarBackend: + def __init__(self, *args, **kwargs): + raise RuntimeError( + "`CLARIOStarBackend` (capital 'S') is deprecated. Please use `CLARIOstarBackend` instead." + ) diff --git a/pylabrobot/plate_reading/clario_star_backend.py b/pylabrobot/plate_reading/clario_star_backend.py index 23435b86bc0..db37b025a1f 100644 --- a/pylabrobot/plate_reading/clario_star_backend.py +++ b/pylabrobot/plate_reading/clario_star_backend.py @@ -1,405 +1,8 @@ -import asyncio -import logging -import math -import struct -import sys -import time -from typing import Dict, List, Optional, Tuple, Union +import warnings -from pylabrobot.resources.well import Well +from .bmg_labtech.clario_star_backend import CLARIOstarBackend # noqa: F401 -try: - from pylibftdi import driver - - HAS_PYLIBFTDI = True -except ImportError as e: - HAS_PYLIBFTDI = False - _FTDI_IMPORT_ERROR = e - -from pylabrobot import utils -from pylabrobot.io.ftdi import FTDI -from pylabrobot.resources.plate import Plate - -from .backend import PlateReaderBackend - -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal - -logger = logging.getLogger("pylabrobot") - -# Make pylibftdi scan the CLARIOstar VID:PID -# appears as ID 0403:bb68 Future Technology Devices International Limited CLARIOstar - -if HAS_PYLIBFTDI: - driver.USB_VID_LIST.append(0x0403) # i.e. 1027 - driver.USB_PID_LIST.append(0xBB68) # i.e. 47976 - - -class CLARIOstarBackend(PlateReaderBackend): - """A plate reader backend for the Clario star. Note that this is not a complete implementation - and many commands and parameters are not implemented yet.""" - - def __init__(self, device_id: Optional[str] = None): - self.io = FTDI(device_id=device_id) - - async def setup(self): - await self.io.setup() - await self.io.set_baudrate(125000) - await self.io.set_line_property(8, 0, 0) # 8N1 - await self.io.set_latency_timer(2) - - await self.initialize() - await self.request_eeprom_data() - - async def stop(self): - await self.io.stop() - - async def get_stat(self): - stat = await self.io.poll_modem_status() - return hex(stat) - - async def read_resp(self, timeout=20) -> bytes: - """Read a response from the plate reader. If the timeout is reached, return the data that has - been read so far.""" - - d = b"" - last_read = b"" - end_byte_found = False - t = time.time() - - # Commands are terminated with 0x0d, but this value may also occur as a part of the response. - # Therefore, we read until we read a 0x0d, but if that's the last byte we read in a full packet, - # we keep reading for at least one more cycle. We only check the timeout if the last read was - # unsuccessful (i.e. keep reading if we are still getting data). - while True: - last_read = await self.io.read(25) # 25 is max length observed in pcap - if len(last_read) > 0: - d += last_read - end_byte_found = d[-1] == 0x0D - if ( - len(last_read) < 25 and end_byte_found - ): # if we read less than 25 bytes, we're at the end - break - else: - # If we didn't read any data, check if the last read ended in an end byte. If so, we're done - if end_byte_found: - break - - # Check if we've timed out. - if time.time() - t > timeout: - logger.warning("timed out reading response") - break - - # If we read data, we don't wait and immediately try to read more. - await asyncio.sleep(0.0001) - - logger.debug("read %s", d.hex()) - - return d - - async def send(self, cmd: Union[bytearray, bytes], read_timeout=20): - """Send a command to the plate reader and return the response.""" - - checksum = (sum(cmd) & 0xFFFF).to_bytes(2, byteorder="big") - cmd = cmd + checksum + b"\x0d" - - logger.debug("sending %s", cmd.hex()) - - w = await self.io.write(cmd) - - logger.debug("wrote %s bytes", w) - - assert w == len(cmd) - - resp = await self.read_resp(timeout=read_timeout) - return resp - - async def _wait_for_ready_and_return(self, ret, timeout=150): - """Wait for the plate reader to be ready and return the response.""" - last_status = None - t = time.time() - while time.time() - t < timeout: - await asyncio.sleep(0.1) - - command_status = await self.read_command_status() - - if len(command_status) != 24: - logger.warning( - "unexpected response %s. I think a command status response is always 24 bytes", - command_status, - ) - continue - - if command_status != last_status: - logger.info("status changed %s", command_status.hex()) - last_status = command_status - else: - continue - - if command_status[2] != 0x18 or command_status[3] != 0x0C or command_status[4] != 0x01: - logger.warning( - "unexpected response %s. I think 18 0c 01 indicates a command status response", - command_status, - ) - - if command_status[5] not in { - 0x25, - 0x05, - }: # 25 is busy, 05 is ready. probably. - logger.warning("unexpected response %s.", command_status) - - if command_status[5] == 0x05: - logger.debug("status is ready") - return ret - - async def read_command_status(self): - status = await self.send(b"\x02\x00\x09\x0c\x80\x00") - return status - - async def initialize(self): - command_response = await self.send(b"\x02\x00\x0d\x0c\x01\x00\x00\x10\x02\x00") - return await self._wait_for_ready_and_return(command_response) - - async def request_eeprom_data(self): - eeprom_response = await self.send(b"\x02\x00\x0f\x0c\x05\x07\x00\x00\x00\x00\x00\x00") - return await self._wait_for_ready_and_return(eeprom_response) - - async def open(self): - open_response = await self.send(b"\x02\x00\x0e\x0c\x03\x01\x00\x00\x00\x00\x00") - return await self._wait_for_ready_and_return(open_response) - - async def close(self, plate: Optional[Plate] = None): - close_response = await self.send(b"\x02\x00\x0e\x0c\x03\x00\x00\x00\x00\x00\x00") - return await self._wait_for_ready_and_return(close_response) - - async def _mp_and_focus_height_value(self): - mp_and_focus_height_value_response = await self.send( - b"\x02\x00\x0f\x0c\x05\17\x00\x00\x00\x00" + b"\x00\x00" - ) - return await self._wait_for_ready_and_return(mp_and_focus_height_value_response) - - async def _run_luminescence(self, focal_height: float): - """Run a plate reader luminescence run.""" - - assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm" - - focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big") - - run_response = await self.send( - b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56" - b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" - b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00" - b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01" - b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00" - ) - - # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. - last_status = None - while True: - await asyncio.sleep(0.1) - - command_status = await self.read_command_status() - - if command_status != last_status: - last_status = command_status - logger.info("status changed %s", command_status) - continue - - if command_status == bytes( - b"\x02\x00\x18\x0c\x01\x25\x04\x2e\x00\x00\x04\x01\x00\x00\x03\x00" - b"\x00\x00\x00\xc0\x00\x01\x46\x0d" - ): - return run_response - - async def _run_absorbance(self, wavelength: float): - """Run a plate reader absorbance run.""" - wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big") - - absorbance_command = ( - b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06" - b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" - b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00" - b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00" - ) - run_response = await self.send(absorbance_command) - - # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. - last_status = None - while True: - await asyncio.sleep(0.1) - - command_status = await self.read_command_status() - - if command_status != last_status: - last_status = command_status - logger.info("status changed %s", command_status) - continue - - if command_status == bytes( - b"\x02\x00\x18\x0c\x01\x25\x04\x2e\x00\x00\x04\x01\x00\x00\x03\x00" - b"\x00\x00\x00\xc0\x00\x01\x46\x0d" - ): - return run_response - - async def _read_order_values(self): - return await self.send(b"\x02\x00\x0f\x0c\x05\x1d\x00\x00\x00\x00\x00\x00") - - async def _status_hw(self): - status_hw_response = await self.send(b"\x02\x00\x09\x0c\x81\x00") - return await self._wait_for_ready_and_return(status_hw_response) - - async def _get_measurement_values(self): - return await self.send(b"\x02\x00\x0f\x0c\x05\x02\x00\x00\x00\x00\x00\x00") - - async def read_luminescence( - self, plate: Plate, wells: List[Well], focal_height: float = 13 - ) -> List[Dict]: - """Read luminescence values from the plate reader.""" - if wells != plate.get_all_items(): - raise NotImplementedError("Only full plate reads are supported for now.") - - await self._mp_and_focus_height_value() - - await self._run_luminescence(focal_height=focal_height) - - await self._read_order_values() - - await self._status_hw() - - vals = await self._get_measurement_values() - - # All 96 values are 32 bit integers. The header is variable length, so we need to find the - # start of the data. In the future, when we understand the protocol better, this can be - # replaced with a more robust solution. - start_idx = vals.index(b"\x00\x00\x00\x00\x00\x00") + len(b"\x00\x00\x00\x00\x00\x00") - data = list(vals)[start_idx : start_idx + 96 * 4] - - # group bytes by 4 - int_bytes = [data[i : i + 4] for i in range(0, len(data), 4)] - - # convert to int - ints = [struct.unpack(">i", bytes(int_data))[0] for int_data in int_bytes] - - # for backend conformity, convert to float, and reshape to 2d array - floats: List[List[Optional[float]]] = utils.reshape_2d( - [float(i) for i in ints], (plate.num_items_y, plate.num_items_x) - ) - - return [ - { - "data": floats, - "temperature": float("nan"), # Temperature not available - "time": time.time(), - } - ] - - async def read_absorbance( - self, - plate: Plate, - wells: List[Well], - wavelength: int, - report: Literal["OD", "transmittance"] = "OD", - ) -> List[Dict]: - """Read absorbance values from the device. - - Args: - wavelength: wavelength to read absorbance at, in nanometers. - report: whether to report absorbance as optical depth (OD) or transmittance. Transmittance is - used interchangeably with "transmission" in the CLARIOStar software and documentation. - - Returns: - A list containing a single dictionary, where the key is (wavelength, 0) and the value is - another dictionary containing the data, temperature, and time. - """ - - if wells != plate.get_all_items(): - raise NotImplementedError("Only full plate reads are supported for now.") - - await self._mp_and_focus_height_value() - - await self._run_absorbance(wavelength=wavelength) - - await self._read_order_values() - - await self._status_hw() - - vals = await self._get_measurement_values() - div = b"\x00" * 6 - start_idx = vals.index(div) + len(div) - chromatic_data = vals[start_idx : start_idx + 96 * 4] - ref_data = vals[start_idx + 96 * 4 : start_idx + (96 * 2) * 4] - chromatic_bytes = [bytes(chromatic_data[i : i + 4]) for i in range(0, len(chromatic_data), 4)] - ref_bytes = [bytes(ref_data[i : i + 4]) for i in range(0, len(ref_data), 4)] - chromatic_reading = [struct.unpack(">i", x)[0] for x in chromatic_bytes] - reference_reading = [struct.unpack(">i", x)[0] for x in ref_bytes] - - # c100 is the value of the chromatic at 100% intensity - # c0 is the value of the chromatic at 0% intensity (black reading) - # r100 is the value of the reference at 100% intensity - # r0 is the value of the reference at 0% intensity (black reading) - after_values_idx = start_idx + (96 * 2) * 4 - c100, c0, r100, r0 = struct.unpack(">iiii", vals[after_values_idx : after_values_idx + 4 * 4]) - - # a bit much, but numpy should not be a dependency - real_chromatic_reading = [] - for cr in chromatic_reading: - real_chromatic_reading.append((cr - c0) / c100) - real_reference_reading = [] - for rr in reference_reading: - real_reference_reading.append((rr - r0) / r100) - - transmittance: List[Optional[float]] = [] - for rcr, rrr in zip(real_chromatic_reading, real_reference_reading): - transmittance.append(rcr / rrr * 100) - - data: List[List[Optional[float]]] - if report == "OD": - od: List[Optional[float]] = [] - for t in transmittance: - od.append(math.log10(100 / t) if t is not None and t > 0 else None) - data = utils.reshape_2d(od, (plate.num_items_y, plate.num_items_x)) - elif report == "transmittance": - data = utils.reshape_2d(transmittance, (plate.num_items_y, plate.num_items_x)) - else: - raise ValueError(f"Invalid report type: {report}") - - return [ - { - "wavelength": wavelength, - "data": data, - "temperature": float("nan"), # Temperature not available - "time": time.time(), - } - ] - - async def read_fluorescence( - self, - plate: Plate, - wells: List[Well], - excitation_wavelength: int, - emission_wavelength: int, - focal_height: float, - ) -> List[Dict[Tuple[int, int], Dict]]: - raise NotImplementedError("Not implemented yet") - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 - - -class CLARIOStar: - def __init__(self, *args, **kwargs): - raise RuntimeError("`CLARIOStar` is deprecated. Please use `CLARIOStarBackend` instead.") - - -class CLARIOStarBackend: - def __init__(self, *args, **kwargs): - raise RuntimeError( - "`CLARIOStarBackend` (capital 'S') is deprecated. Please use `CLARIOstarBackend` instead." - ) +warnings.warn( + "pylabrobot.plate_reading.clario_star_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.bmg_labtech.clario_star_backend instead.", +) diff --git a/pylabrobot/plate_reading/molecular_devices/__init__.py b/pylabrobot/plate_reading/molecular_devices/__init__.py new file mode 100644 index 00000000000..4195d7f64ad --- /dev/null +++ b/pylabrobot/plate_reading/molecular_devices/__init__.py @@ -0,0 +1,43 @@ +from .backend import ( + Calibrate, + CarriageSpeed, + KineticSettings, + MolecularDevicesBackend, + MolecularDevicesError, + MolecularDevicesFirmwareError, + MolecularDevicesHardwareError, + MolecularDevicesMotionError, + MolecularDevicesNVRAMError, + MolecularDevicesSettings, + MolecularDevicesUnrecognizedCommandError, + PmtGain, + ReadMode, + ReadOrder, + ReadType, + ShakeSettings, + SpectrumSettings, +) +from .spectramax_384_plus_backend import MolecularDevicesSpectraMax384PlusBackend +from .spectramax_m5_backend import MolecularDevicesSpectraMaxM5Backend + +__all__ = [ + "MolecularDevicesBackend", + "MolecularDevicesSettings", + "MolecularDevicesError", + "MolecularDevicesUnrecognizedCommandError", + "MolecularDevicesFirmwareError", + "MolecularDevicesHardwareError", + "MolecularDevicesMotionError", + "MolecularDevicesNVRAMError", + "ReadMode", + "ReadType", + "ReadOrder", + "Calibrate", + "CarriageSpeed", + "PmtGain", + "ShakeSettings", + "KineticSettings", + "SpectrumSettings", + "MolecularDevicesSpectraMaxM5Backend", + "MolecularDevicesSpectraMax384PlusBackend", +] diff --git a/pylabrobot/plate_reading/molecular_devices/backend.py b/pylabrobot/plate_reading/molecular_devices/backend.py new file mode 100644 index 00000000000..ca94a084a71 --- /dev/null +++ b/pylabrobot/plate_reading/molecular_devices/backend.py @@ -0,0 +1,983 @@ +import asyncio +import logging +import re +import time +from abc import ABCMeta +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, List, Literal, Optional, Tuple, Union + +from pylabrobot.io.serial import Serial +from pylabrobot.plate_reading.backend import PlateReaderBackend +from pylabrobot.resources.plate import Plate + +logger = logging.getLogger("pylabrobot") + +RES_TERM_CHAR = b">" +COMMAND_TERMINATORS: Dict[str, int] = { + "!AUTOFILTER": 1, + "!AUTOPMT": 1, + "!BAUD": 1, + "!CALIBRATE": 1, + "!CANCEL": 1, + "!CLEAR": 1, + "!CLOSE": 1, + "!CSPEED": 1, + "!REFERENCE": 1, + "!EMFILTER": 1, + "!EMWAVELENGTH": 1, + "!ERROR": 2, + "!EXWAVELENGTH": 1, + "!FPW": 1, + "!INIT": 1, + "!MODE": 1, + "!NVRAM": 1, + "!OPEN": 1, + "!ORDER": 1, + "OPTION": 2, + "!AIR_CAL": 1, + "!PMT": 1, + "!PMTCAL": 1, + "!QUEUE": 2, + "!READ": 1, + "!TOP": 1, + "!READSTAGE": 2, + "!READTYPE": 2, + "!RESEND": 1, + "!RESET": 1, + "!SHAKE": 1, + "!SPEED": 2, + "!STATUS": 2, + "!STRIP": 1, + "!TAG": 1, + "!TEMP": 2, + "!TRANSFER": 2, + "!USER_NUMBER": 2, + "!XPOS": 1, + "!YPOS": 1, + "!WAVELENGTH": 1, + "!WELLSCANMODE": 2, + "!PATHCAL": 2, + "!COUNTTIME": 1, + "!COUNTTIMEDELAY": 1, +} + + +class MolecularDevicesError(Exception): + """Exceptions raised by a Molecular Devices plate reader.""" + + +class MolecularDevicesUnrecognizedCommandError(MolecularDevicesError): + """Unrecognized command errors sent from the computer.""" + + +class MolecularDevicesFirmwareError(MolecularDevicesError): + """Firmware errors.""" + + +class MolecularDevicesHardwareError(MolecularDevicesError): + """Hardware errors.""" + + +class MolecularDevicesMotionError(MolecularDevicesError): + """Motion errors.""" + + +class MolecularDevicesNVRAMError(MolecularDevicesError): + """NVRAM errors.""" + + +ERROR_CODES: Dict[int, Tuple[str, type]] = { + 100: ("command not found", MolecularDevicesUnrecognizedCommandError), + 101: ("invalid argument", MolecularDevicesUnrecognizedCommandError), + 102: ("too many arguments", MolecularDevicesUnrecognizedCommandError), + 103: ("not enough arguments", MolecularDevicesUnrecognizedCommandError), + 104: ("input line too long", MolecularDevicesUnrecognizedCommandError), + 105: ("command invalid, system busy", MolecularDevicesUnrecognizedCommandError), + 106: ("command invalid, measurement in progress", MolecularDevicesUnrecognizedCommandError), + 107: ("no data to transfer", MolecularDevicesUnrecognizedCommandError), + 108: ("data buffer full", MolecularDevicesUnrecognizedCommandError), + 109: ("error buffer overflow", MolecularDevicesUnrecognizedCommandError), + 110: ("stray light cuvette, door open?", MolecularDevicesUnrecognizedCommandError), + 111: ("invalid read settings", MolecularDevicesUnrecognizedCommandError), + 200: ("assert failed", MolecularDevicesFirmwareError), + 201: ("bad error number", MolecularDevicesFirmwareError), + 202: ("receive queue overflow", MolecularDevicesFirmwareError), + 203: ("serial port parity error", MolecularDevicesFirmwareError), + 204: ("serial port overrun error", MolecularDevicesFirmwareError), + 205: ("serial port framing error", MolecularDevicesFirmwareError), + 206: ("cmd generated too much output", MolecularDevicesFirmwareError), + 207: ("fatal trap", MolecularDevicesFirmwareError), + 208: ("RTOS error", MolecularDevicesFirmwareError), + 209: ("stack overflow", MolecularDevicesFirmwareError), + 210: ("unknown interrupt", MolecularDevicesFirmwareError), + 300: ("thermistor faulty", MolecularDevicesHardwareError), + 301: ("safe temperature limit exceeded", MolecularDevicesHardwareError), + 302: ("low light", MolecularDevicesHardwareError), + 303: ("unable to cal dark current", MolecularDevicesHardwareError), + 304: ("signal level saturation", MolecularDevicesHardwareError), + 305: ("reference level saturation", MolecularDevicesHardwareError), + 306: ("plate air cal fail, low light", MolecularDevicesHardwareError), + 307: ("cuv air ref fail", MolecularDevicesHardwareError), + 308: ("stray light", MolecularDevicesHardwareError), + 312: ("gain calibration failed", MolecularDevicesHardwareError), + 313: ("reference gain check fail", MolecularDevicesHardwareError), + 314: ("low lamp level warning", MolecularDevicesHardwareError), + 315: ("can't find zero order", MolecularDevicesHardwareError), + 316: ("grating motor driver faulty", MolecularDevicesHardwareError), + 317: ("monitor ADC faulty", MolecularDevicesHardwareError), + 400: ("carriage motion error", MolecularDevicesMotionError), + 401: ("filter wheel error", MolecularDevicesMotionError), + 402: ("grating error", MolecularDevicesMotionError), + 403: ("stage error", MolecularDevicesMotionError), + 500: ("NVRAM CRC corrupt", MolecularDevicesNVRAMError), + 501: ("NVRAM Grating cal data bad", MolecularDevicesNVRAMError), + 502: ("NVRAM Cuvette air cal data error", MolecularDevicesNVRAMError), + 503: ("NVRAM Plate air cal data error", MolecularDevicesNVRAMError), + 504: ("NVRAM Carriage offset error", MolecularDevicesNVRAMError), + 505: ("NVRAM Stage offset error", MolecularDevicesNVRAMError), +} + + +MolecularDevicesResponse = List[str] + + +class ReadMode(Enum): + """The read mode of the plate reader (e.g., Absorbance, Fluorescence).""" + + ABS = "ABS" + FLU = "FLU" + LUM = "LUM" + POLAR = "POLAR" + TIME = "TIME" + + +class ReadType(Enum): + """The type of read to perform (e.g., Endpoint, Kinetic).""" + + ENDPOINT = "ENDPOINT" + KINETIC = "KINETIC" + SPECTRUM = "SPECTRUM" + WELL_SCAN = "WELLSCAN" + + +class ReadOrder(Enum): + """The order in which to read the plate wells.""" + + COLUMN = "COLUMN" + WAVELENGTH = "WAVELENGTH" + + +class Calibrate(Enum): + """The calibration mode for the read.""" + + ON = "ON" + ONCE = "ONCE" + OFF = "OFF" + + +class CarriageSpeed(Enum): + """The speed of the plate carriage.""" + + NORMAL = "8" + SLOW = "1" + + +class PmtGain(Enum): + """The photomultiplier tube gain setting.""" + + AUTO = "ON" + HIGH = "HIGH" + MEDIUM = "MED" + LOW = "LOW" + + +@dataclass +class ShakeSettings: + """Settings for shaking the plate during a read.""" + + before_read: bool = False + before_read_duration: int = 0 + between_reads: bool = False + between_reads_duration: int = 0 + + +@dataclass +class KineticSettings: + """Settings for kinetic reads.""" + + interval: int + num_readings: int + + +@dataclass +class SpectrumSettings: + """Settings for spectrum reads.""" + + start_wavelength: int + step: int + num_steps: int + excitation_emission_type: Optional[Literal["EXSPECTRUM", "EMSPECTRUM"]] = None + + +@dataclass +class MolecularDevicesSettings: + """A comprehensive, internal container for all plate reader settings.""" + + plate: Plate = field(repr=False) + read_mode: ReadMode + read_type: ReadType + read_order: ReadOrder + calibrate: Calibrate + shake_settings: Optional[ShakeSettings] + carriage_speed: CarriageSpeed + speed_read: bool + kinetic_settings: Optional[KineticSettings] + spectrum_settings: Optional[SpectrumSettings] + wavelengths: List[Union[int, Tuple[int, bool]]] = field(default_factory=list) + excitation_wavelengths: List[int] = field(default_factory=list) + emission_wavelengths: List[int] = field(default_factory=list) + cutoff_filters: List[int] = field(default_factory=list) + path_check: bool = False + read_from_bottom: bool = False + pmt_gain: Union[PmtGain, int] = PmtGain.AUTO + flashes_per_well: int = 1 + cuvette: bool = False + settling_time: int = 0 + + +class MolecularDevicesBackend(PlateReaderBackend, metaclass=ABCMeta): + """Backend for Molecular Devices plate readers.""" + + def __init__(self, port: str) -> None: + self.port = port + self.io = Serial(self.port, baudrate=9600, timeout=0.2) + + async def setup(self) -> None: + await self.io.setup() + await self.send_command("!") + + async def stop(self) -> None: + await self.io.stop() + + def serialize(self) -> dict: + return {**super().serialize(), "port": self.port} + + async def send_command( + self, command: str, timeout: int = 60, num_res_fields=None + ) -> MolecularDevicesResponse: + """Send a command and receive the response, automatically determining the number of + response fields. + """ + base_command = command.split(" ")[0] + if num_res_fields is None: + num_res_fields = COMMAND_TERMINATORS.get(base_command, 1) + else: + num_res_fields = max(1, num_res_fields) + + await self.io.write(command.encode() + b"\r") + raw_response = b"" + timeout_time = time.time() + timeout + while True: + raw_response += await self.io.readline() + await asyncio.sleep(0.001) + if time.time() > timeout_time: + raise TimeoutError(f"Timeout waiting for response to command: {command}") + if raw_response.count(RES_TERM_CHAR) >= num_res_fields: + break + logger.debug("[plate reader] Command: %s, Response: %s", command, raw_response) + response = raw_response.decode("utf-8").strip().split(RES_TERM_CHAR.decode()) + response = [r.strip() for r in response if r.strip() != ""] + self._parse_basic_errors(response, command) + return response + + def _parse_basic_errors(self, response: List[str], command: str) -> None: + if not response: + raise MolecularDevicesError(f"Command '{command}' failed with empty response.") + + # Check for FAIL in the response + error_code_msg = response[0] if "FAIL" in response[0] else response[-1] + if "FAIL" in error_code_msg: + parts = error_code_msg.split("\t") + try: + error_code_str = parts[-1] + error_code = int(error_code_str.strip()) + if error_code in ERROR_CODES: + message, err_class = ERROR_CODES[error_code] + raise err_class(f"Command '{command}' failed with error {error_code}: {message}") + raise MolecularDevicesError( + f"Command '{command}' failed with unknown error code: {error_code}" + ) + except (ValueError, IndexError): + raise MolecularDevicesError( + f"Command '{command}' failed with unparsable error: {response[0]}" + ) + + if "OK" not in response[0]: + raise MolecularDevicesError(f"Command '{command}' failed with response: {response}") + if "warning" in response[0].lower(): + logger.warning("Warning for command '%s': %s", command, response) + + async def open(self) -> None: + await self.send_command("!OPEN") + + async def close(self, plate: Optional[Plate] = None) -> None: + await self.send_command("!CLOSE") + + async def get_status(self) -> List[str]: + res = await self.send_command("!STATUS") + return res[1].split() + + async def read_error_log(self) -> str: + res = await self.send_command("!ERROR") + return res[1] + + async def clear_error_log(self) -> None: + await self.send_command("!CLEAR ERROR") + + async def get_temperature(self) -> Tuple[float, float]: + res = await self.send_command("!TEMP") + parts = res[1].split() + return (float(parts[1]), float(parts[0])) # current, set_point + + async def set_temperature(self, temperature: float) -> None: + if not (0 <= temperature <= 45): + raise ValueError("Temperature must be between 0 and 45°C.") + await self.send_command(f"!TEMP {temperature}") + + async def get_firmware_version(self) -> List[str]: + res = await self.send_command("!OPTION") + return res[1].split() + + async def start_shake(self) -> None: + await self.send_command("!SHAKE NOW") + + async def stop_shake(self) -> None: + await self.send_command("!SHAKE STOP") + + async def _read_now(self) -> None: + await self.send_command("!READ") + + async def _transfer_data(self, settings: MolecularDevicesSettings) -> List[Dict]: + """Transfer data from the plate reader. For kinetic/spectrum reads, this will transfer data for each + reading and combine them into a single collection. + """ + + if (settings.read_type == ReadType.KINETIC and settings.kinetic_settings) or ( + settings.read_type == ReadType.SPECTRUM and settings.spectrum_settings + ): + if settings.kinetic_settings: + num_readings = settings.kinetic_settings.num_readings + elif settings.spectrum_settings: + num_readings = settings.spectrum_settings.num_steps + else: + raise ValueError("Kinetic or Spectrum settings must be provided for this read type.") + + all_reads = [] + for _ in range(num_readings): + res = await self.send_command("!TRANSFER") + data_str = res[1] + read_data = self._parse_data(data_str, settings) + all_reads.extend(read_data) # Unpack the list + return all_reads + + # For ENDPOINT + res = await self.send_command("!TRANSFER") + data_str = res[1] + return self._parse_data(data_str, settings) + + def _parse_data(self, data_str: str, settings: MolecularDevicesSettings) -> List[Dict]: + lines = re.split(r"\r\n|\n", data_str.strip()) + lines = [line.strip() for line in lines if line.strip()] + + # 1. Parse header + header_parts = lines[0].split("\t") + measurement_time = float(header_parts[0]) + temperature = float(header_parts[1]) + + # 2. Parse wavelengths + line_idx = 1 + while line_idx < len(lines): + line = lines[line_idx] + if line.startswith("L:") and line_idx > 1: + # Data section started + break + line_idx += 1 + + data_collection = [] + cur_read_wavelengths = [] + # 3. Parse data + data_columns: List[List[float]] = [] + # The data section starts at line_idx + for i in range(line_idx, len(lines)): + line = lines[i] + if line.startswith("L:"): + # start of a new data with different wavelength + cur_read_wavelengths.append(line.split("\t")[1:]) + if i > line_idx and data_columns: + data_collection.append(data_columns) + data_columns = [] + match = re.match(r"^\s*(\d+):\s*(.*)", line) + if match: + values_str = re.split(r"\s+", match.group(2).strip()) + values = [] + for v in values_str: + if v.strip().replace(".", "", 1).isdigit(): + values.append(float(v.strip())) + elif v.strip() == "#SAT": + values.append(float("inf")) + else: + values.append(float("nan")) + data_columns.append(values) + if data_columns: + data_collection.append(data_columns) + + # 4. Transpose data to be row-major + data_collection_transposed = [] + for data_columns in data_collection: + data_rows = [] + if data_columns: + num_rows = len(data_columns[0]) + num_cols = len(data_columns) + for i in range(num_rows): + row = [data_columns[j][i] for j in range(num_cols)] + data_rows.append(row) + data_collection_transposed.append(data_rows) + + measurements = [] + read_mode = settings.read_mode + for i, data_rows in enumerate(data_collection_transposed): + measurement = { + "data": data_rows, + "temperature": temperature, + "time": measurement_time, + } + if read_mode == ReadMode.ABS: + wl = int(cur_read_wavelengths[i][0]) + measurement["wavelength"] = wl + elif read_mode == ReadMode.FLU or read_mode == ReadMode.POLAR or read_mode == ReadMode.TIME: + ex_wl = int(cur_read_wavelengths[i][0]) + em_wl = int(cur_read_wavelengths[i][1]) + measurement["ex_wavelength"] = ex_wl + measurement["em_wavelength"] = em_wl + elif read_mode == ReadMode.LUM: + em_wl = int(cur_read_wavelengths[i][1]) + measurement["em_wavelength"] = em_wl + measurements.append(measurement) + + return measurements + + async def _set_clear(self) -> None: + await self.send_command("!CLEAR DATA") + + async def _set_mode(self, settings: MolecularDevicesSettings) -> None: + cmd = f"!MODE {settings.read_type.value}" + if settings.read_type == ReadType.KINETIC and settings.kinetic_settings: + ks = settings.kinetic_settings + cmd += f" {ks.interval} {ks.num_readings}" + elif settings.read_type == ReadType.SPECTRUM and settings.spectrum_settings: + ss = settings.spectrum_settings + cmd = "!MODE" + scan_type = ss.excitation_emission_type or "SPECTRUM" + cmd += f" {scan_type} {ss.start_wavelength} {ss.step} {ss.num_steps}" + await self.send_command(cmd) + + async def _set_wavelengths(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode == ReadMode.ABS: + wl_parts = [] + for wl in settings.wavelengths: + wl_parts.append(f"F{wl[0]}" if isinstance(wl, tuple) and wl[1] else str(wl)) + wl_str = " ".join(wl_parts) + if settings.path_check: + wl_str += " 900 998" + await self.send_command(f"!WAVELENGTH {wl_str}") + elif settings.read_mode in (ReadMode.FLU, ReadMode.POLAR, ReadMode.TIME): + ex_wl_str = " ".join(map(str, settings.excitation_wavelengths)) + em_wl_str = " ".join(map(str, settings.emission_wavelengths)) + await self.send_command(f"!EXWAVELENGTH {ex_wl_str}") + await self.send_command(f"!EMWAVELENGTH {em_wl_str}") + elif settings.read_mode == ReadMode.LUM: + wl_str = " ".join(map(str, settings.emission_wavelengths)) + await self.send_command(f"!EMWAVELENGTH {wl_str}") + else: + raise NotImplementedError("f{settings.read_mode} not supported") + + async def _set_plate_position(self, settings: MolecularDevicesSettings) -> None: + plate = settings.plate + num_cols, num_rows, size_y = plate.num_items_x, plate.num_items_y, plate.get_size_y() + if num_cols < 2 or num_rows < 2: + raise ValueError("Plate must have at least 2 rows and 2 columns to calculate well spacing.") + top_left_well = plate.get_item(0) + if top_left_well.location is None: + raise ValueError("Top left well location is not set.") + top_left_well_center = top_left_well.location + top_left_well.get_anchor(x="c", y="c") + loc_A1 = plate.get_item("A1").location + loc_A2 = plate.get_item("A2").location + loc_B1 = plate.get_item("B1").location + if loc_A1 is None or loc_A2 is None or loc_B1 is None: + raise ValueError("Well locations for A1, A2, or B1 are not set.") + dx = loc_A2.x - loc_A1.x + dy = loc_A1.y - loc_B1.y + + x_pos_cmd = f"!XPOS {top_left_well_center.x:.3f} {dx:.3f} {num_cols}" + y_pos_cmd = f"!YPOS {size_y - top_left_well_center.y:.3f} {dy:.3f} {num_rows}" + await self.send_command(x_pos_cmd) + await self.send_command(y_pos_cmd) + + async def _set_strip(self, settings: MolecularDevicesSettings) -> None: + await self.send_command(f"!STRIP 1 {settings.plate.num_items_x}") + + async def _set_shake(self, settings: MolecularDevicesSettings) -> None: + if not settings.shake_settings: + await self.send_command("!SHAKE OFF") + return + ss = settings.shake_settings + shake_mode = "ON" if ss.before_read or ss.between_reads else "OFF" + before_duration = ss.before_read_duration if ss.before_read else 0 + ki = settings.kinetic_settings.interval if settings.kinetic_settings else 0 + if ss.between_reads and ki > 0: + between_duration = ss.between_reads_duration + wait_duration = ki - between_duration + else: + between_duration = 0 + wait_duration = 0 + await self.send_command(f"!SHAKE {shake_mode}") + await self.send_command(f"!SHAKE {before_duration} {ki} {wait_duration} {between_duration} 0") + + async def _set_carriage_speed(self, settings: MolecularDevicesSettings) -> None: + await self.send_command(f"!CSPEED {settings.carriage_speed.value}") + + async def _set_read_stage(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): + stage = "BOT" if settings.read_from_bottom else "TOP" + await self.send_command(f"!READSTAGE {stage}") + + async def _set_flashes_per_well(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): + await self.send_command(f"!FPW {settings.flashes_per_well}") + + async def _set_pmt(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode not in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): + return + gain = settings.pmt_gain + if gain == PmtGain.AUTO: + await self.send_command("!AUTOPMT ON") + else: + gain_val = gain.value if isinstance(gain, PmtGain) else gain + await self.send_command("!AUTOPMT OFF") + await self.send_command(f"!PMT {gain_val}") + + async def _set_filter(self, settings: MolecularDevicesSettings) -> None: + if ( + settings.read_mode in (ReadMode.FLU, ReadMode.POLAR, ReadMode.TIME) + and settings.cutoff_filters + ): + cf_str = " ".join(map(str, settings.cutoff_filters)) + await self.send_command("!AUTOFILTER OFF") + await self.send_command(f"!EMFILTER {cf_str}") + else: + await self.send_command("!AUTOFILTER ON") + + async def _set_calibrate(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode == ReadMode.ABS: + await self.send_command(f"!CALIBRATE {settings.calibrate.value}") + else: + await self.send_command(f"!PMTCAL {settings.calibrate.value}") + + async def _set_order(self, settings: MolecularDevicesSettings) -> None: + await self.send_command(f"!ORDER {settings.read_order.value}") + + async def _set_speed(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode == ReadMode.ABS: + mode = "ON" if settings.speed_read else "OFF" + await self.send_command(f"!SPEED {mode}") + + async def _set_nvram(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode == ReadMode.POLAR: + command = "FPSETTLETIME" + value = settings.settling_time + else: + command = "CARCOL" + value = settings.settling_time if settings.settling_time > 100 else 100 + await self.send_command(f"!NVRAM {command} {value}") + + async def _set_tag(self, settings: MolecularDevicesSettings) -> None: + if settings.read_mode == ReadMode.POLAR and settings.read_type == ReadType.KINETIC: + await self.send_command("!TAG ON") + else: + await self.send_command("!TAG OFF") + + async def _set_readtype(self, settings: MolecularDevicesSettings) -> None: + """Set the READTYPE command and the expected number of response fields.""" + cuvette = settings.cuvette + num_res_fields = COMMAND_TERMINATORS.get("!READTYPE", 2) + + if settings.read_mode == ReadMode.ABS: + cmd = f"!READTYPE ABS{'CUV' if cuvette else 'PLA'}" + elif settings.read_mode == ReadMode.FLU: + cmd = f"!READTYPE FLU{'CUV' if cuvette else ''}" + num_res_fields = 2 if cuvette else 1 + elif settings.read_mode == ReadMode.LUM: + cmd = f"!READTYPE LUM{'CUV' if cuvette else ''}" + num_res_fields = 2 if cuvette else 1 + elif settings.read_mode == ReadMode.POLAR: + cmd = "!READTYPE POLAR" + num_res_fields = 1 + elif settings.read_mode == ReadMode.TIME: + cmd = "!READTYPE TIME 0 250" + num_res_fields = 1 + else: + raise ValueError(f"Unsupported read mode: {settings.read_mode}") + + await self.send_command(cmd, num_res_fields=num_res_fields) + + async def _set_integration_time( + self, settings: MolecularDevicesSettings, delay_time: int, integration_time: int + ) -> None: + if settings.read_mode == ReadMode.TIME: + await self.send_command(f"!COUNTTIMEDELAY {delay_time}") + await self.send_command(f"!COUNTTIME {integration_time * 0.001}") + + def _get_cutoff_filter_index_from_wavelength(self, wavelength: int) -> int: + """Converts a wavelength to a cutoff filter index.""" + # This map is a direct translation of the `EmissionCutoff.CutoffFilter` in MaxlineModel.cs + # (min_wavelength, max_wavelength, cutoff_filter_index) + FILTERS = [ + (0, 322, 1), + (325, 415, 16), + (420, 435, 2), + (435, 455, 3), + (455, 475, 4), + (475, 495, 5), + (495, 515, 6), + (515, 530, 7), + (530, 550, 8), + (550, 570, 9), + (570, 590, 10), + (590, 610, 11), + (610, 630, 12), + (630, 665, 13), + (665, 695, 14), + (695, 900, 15), + ] + for min_wl, max_wl, cutoff_filter_index in FILTERS: + if min_wl <= wavelength < max_wl: + return cutoff_filter_index + raise ValueError(f"No cutoff filter found for wavelength {wavelength}") + + async def _wait_for_idle(self, timeout: int = 600): + """Wait for the plate reader to become idle.""" + start_time = time.time() + while True: + if time.time() - start_time > timeout: + raise TimeoutError("Timeout waiting for plate reader to become idle.") + status = await self.get_status() + if status and status[1] == "IDLE": + break + await asyncio.sleep(1) + + async def read_absorbance( # type: ignore[override] + self, + plate: Plate, + wavelengths: List[Union[int, Tuple[int, bool]]], + read_type: ReadType = ReadType.ENDPOINT, + read_order: ReadOrder = ReadOrder.COLUMN, + calibrate: Calibrate = Calibrate.ONCE, + shake_settings: Optional[ShakeSettings] = None, + carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, + speed_read: bool = False, + path_check: bool = False, + kinetic_settings: Optional[KineticSettings] = None, + spectrum_settings: Optional[SpectrumSettings] = None, + cuvette: bool = False, + settling_time: int = 0, + timeout: int = 600, + ) -> List[Dict]: + settings = MolecularDevicesSettings( + plate=plate, + read_mode=ReadMode.ABS, + read_type=read_type, + read_order=read_order, + calibrate=calibrate, + shake_settings=shake_settings, + carriage_speed=carriage_speed, + speed_read=speed_read, + path_check=path_check, + kinetic_settings=kinetic_settings, + spectrum_settings=spectrum_settings, + wavelengths=wavelengths, + cuvette=cuvette, + settling_time=settling_time, + ) + await self._set_clear() + if not cuvette: + await self._set_plate_position(settings) + await self._set_strip(settings) + await self._set_carriage_speed(settings) + + await self._set_shake(settings) + await self._set_wavelengths(settings) + await self._set_calibrate(settings) + await self._set_mode(settings) + await self._set_order(settings) + await self._set_speed(settings) + await self._set_tag(settings) + await self._set_nvram(settings) + await self._set_readtype(settings) + + await self._read_now() + await self._wait_for_idle(timeout=timeout) + return await self._transfer_data(settings) + + async def read_fluorescence( # type: ignore[override] + self, + plate: Plate, + excitation_wavelengths: List[int], + emission_wavelengths: List[int], + cutoff_filters: List[int], + read_type: ReadType = ReadType.ENDPOINT, + read_order: ReadOrder = ReadOrder.COLUMN, + calibrate: Calibrate = Calibrate.ONCE, + shake_settings: Optional[ShakeSettings] = None, + carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, + read_from_bottom: bool = False, + pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, + flashes_per_well: int = 10, + kinetic_settings: Optional[KineticSettings] = None, + spectrum_settings: Optional[SpectrumSettings] = None, + cuvette: bool = False, + settling_time: int = 0, + timeout: int = 600, + ) -> List[Dict]: + """use _get_cutoff_filter_index_from_wavelength for cutoff_filters""" + settings = MolecularDevicesSettings( + plate=plate, + read_mode=ReadMode.FLU, + read_type=read_type, + read_order=read_order, + calibrate=calibrate, + shake_settings=shake_settings, + carriage_speed=carriage_speed, + read_from_bottom=read_from_bottom, + pmt_gain=pmt_gain, + flashes_per_well=flashes_per_well, + kinetic_settings=kinetic_settings, + spectrum_settings=spectrum_settings, + excitation_wavelengths=excitation_wavelengths, + emission_wavelengths=emission_wavelengths, + cutoff_filters=cutoff_filters, + cuvette=cuvette, + speed_read=False, + settling_time=settling_time, + ) + await self._set_clear() + if not cuvette: + await self._set_plate_position(settings) + await self._set_strip(settings) + await self._set_carriage_speed(settings) + + await self._set_shake(settings) + await self._set_flashes_per_well(settings) + await self._set_pmt(settings) + await self._set_wavelengths(settings) + await self._set_filter(settings) + await self._set_read_stage(settings) + await self._set_calibrate(settings) + await self._set_mode(settings) + await self._set_order(settings) + await self._set_tag(settings) + await self._set_nvram(settings) + await self._set_readtype(settings) + + await self._read_now() + await self._wait_for_idle(timeout=timeout) + return await self._transfer_data(settings) + + async def read_luminescence( # type: ignore[override] + self, + plate: Plate, + emission_wavelengths: List[int], + read_type: ReadType = ReadType.ENDPOINT, + read_order: ReadOrder = ReadOrder.COLUMN, + calibrate: Calibrate = Calibrate.ONCE, + shake_settings: Optional[ShakeSettings] = None, + carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, + read_from_bottom: bool = False, + pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, + flashes_per_well: int = 0, + kinetic_settings: Optional[KineticSettings] = None, + spectrum_settings: Optional[SpectrumSettings] = None, + cuvette: bool = False, + settling_time: int = 0, + timeout: int = 600, + ) -> List[Dict]: + settings = MolecularDevicesSettings( + plate=plate, + read_mode=ReadMode.LUM, + read_type=read_type, + read_order=read_order, + calibrate=calibrate, + shake_settings=shake_settings, + carriage_speed=carriage_speed, + read_from_bottom=read_from_bottom, + pmt_gain=pmt_gain, + flashes_per_well=flashes_per_well, + kinetic_settings=kinetic_settings, + spectrum_settings=spectrum_settings, + emission_wavelengths=emission_wavelengths, + cuvette=cuvette, + speed_read=False, + settling_time=settling_time, + ) + await self._set_clear() + await self._set_read_stage(settings) + + if not cuvette: + await self._set_plate_position(settings) + await self._set_strip(settings) + await self._set_carriage_speed(settings) + + await self._set_shake(settings) + await self._set_pmt(settings) + await self._set_wavelengths(settings) + await self._set_read_stage(settings) + await self._set_calibrate(settings) + await self._set_mode(settings) + await self._set_order(settings) + await self._set_tag(settings) + await self._set_nvram(settings) + await self._set_readtype(settings) + + await self._read_now() + await self._wait_for_idle(timeout=timeout) + return await self._transfer_data(settings) + + async def read_fluorescence_polarization( + self, + plate: Plate, + excitation_wavelengths: List[int], + emission_wavelengths: List[int], + cutoff_filters: List[int], + read_type: ReadType = ReadType.ENDPOINT, + read_order: ReadOrder = ReadOrder.COLUMN, + calibrate: Calibrate = Calibrate.ONCE, + shake_settings: Optional[ShakeSettings] = None, + carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, + read_from_bottom: bool = False, + pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, + flashes_per_well: int = 10, + kinetic_settings: Optional[KineticSettings] = None, + spectrum_settings: Optional[SpectrumSettings] = None, + cuvette: bool = False, + settling_time: int = 0, + timeout: int = 600, + ) -> List[Dict]: + settings = MolecularDevicesSettings( + plate=plate, + read_mode=ReadMode.POLAR, + read_type=read_type, + read_order=read_order, + calibrate=calibrate, + shake_settings=shake_settings, + carriage_speed=carriage_speed, + read_from_bottom=read_from_bottom, + pmt_gain=pmt_gain, + flashes_per_well=flashes_per_well, + kinetic_settings=kinetic_settings, + spectrum_settings=spectrum_settings, + excitation_wavelengths=excitation_wavelengths, + emission_wavelengths=emission_wavelengths, + cutoff_filters=cutoff_filters, + cuvette=cuvette, + speed_read=False, + settling_time=settling_time, + ) + await self._set_clear() + if not cuvette: + await self._set_plate_position(settings) + await self._set_strip(settings) + await self._set_carriage_speed(settings) + + await self._set_shake(settings) + await self._set_flashes_per_well(settings) + await self._set_pmt(settings) + await self._set_wavelengths(settings) + await self._set_filter(settings) + await self._set_read_stage(settings) + await self._set_calibrate(settings) + await self._set_mode(settings) + await self._set_order(settings) + await self._set_tag(settings) + await self._set_nvram(settings) + await self._set_readtype(settings) + + await self._read_now() + await self._wait_for_idle(timeout=timeout) + return await self._transfer_data(settings) + + async def read_time_resolved_fluorescence( + self, + plate: Plate, + excitation_wavelengths: List[int], + emission_wavelengths: List[int], + cutoff_filters: List[int], + delay_time: int, + integration_time: int, + read_type: ReadType = ReadType.ENDPOINT, + read_order: ReadOrder = ReadOrder.COLUMN, + calibrate: Calibrate = Calibrate.ONCE, + shake_settings: Optional[ShakeSettings] = None, + carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, + read_from_bottom: bool = False, + pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, + flashes_per_well: int = 50, + kinetic_settings: Optional[KineticSettings] = None, + spectrum_settings: Optional[SpectrumSettings] = None, + cuvette: bool = False, + settling_time: int = 0, + timeout: int = 600, + ) -> List[Dict]: + settings = MolecularDevicesSettings( + plate=plate, + read_mode=ReadMode.TIME, + read_type=read_type, + read_order=read_order, + calibrate=calibrate, + shake_settings=shake_settings, + carriage_speed=carriage_speed, + read_from_bottom=read_from_bottom, + pmt_gain=pmt_gain, + flashes_per_well=flashes_per_well, + kinetic_settings=kinetic_settings, + spectrum_settings=spectrum_settings, + excitation_wavelengths=excitation_wavelengths, + emission_wavelengths=emission_wavelengths, + cutoff_filters=cutoff_filters, + cuvette=cuvette, + speed_read=False, + settling_time=settling_time, + ) + await self._set_clear() + await self._set_readtype(settings) + await self._set_integration_time(settings, delay_time, integration_time) + + if not cuvette: + await self._set_plate_position(settings) + await self._set_strip(settings) + await self._set_carriage_speed(settings) + + await self._set_shake(settings) + await self._set_flashes_per_well(settings) + await self._set_pmt(settings) + await self._set_wavelengths(settings) + await self._set_filter(settings) + await self._set_calibrate(settings) + await self._set_read_stage(settings) + await self._set_mode(settings) + await self._set_order(settings) + await self._set_tag(settings) + await self._set_nvram(settings) + + await self._read_now() + await self._wait_for_idle(timeout=timeout) + return await self._transfer_data(settings) diff --git a/pylabrobot/plate_reading/molecular_devices_backend_tests.py b/pylabrobot/plate_reading/molecular_devices/backend_tests.py similarity index 99% rename from pylabrobot/plate_reading/molecular_devices_backend_tests.py rename to pylabrobot/plate_reading/molecular_devices/backend_tests.py index de0d83bdc89..0f6f0df3914 100644 --- a/pylabrobot/plate_reading/molecular_devices_backend_tests.py +++ b/pylabrobot/plate_reading/molecular_devices/backend_tests.py @@ -2,7 +2,7 @@ import unittest from unittest.mock import AsyncMock, MagicMock, call, patch -from pylabrobot.plate_reading.molecular_devices_backend import ( +from pylabrobot.plate_reading.molecular_devices.backend import ( Calibrate, CarriageSpeed, KineticSettings, diff --git a/pylabrobot/plate_reading/molecular_devices_spectramax_384_plus_backend.py b/pylabrobot/plate_reading/molecular_devices/spectramax_384_plus_backend.py similarity index 99% rename from pylabrobot/plate_reading/molecular_devices_spectramax_384_plus_backend.py rename to pylabrobot/plate_reading/molecular_devices/spectramax_384_plus_backend.py index 65d40d0875a..ce3bbc758e5 100644 --- a/pylabrobot/plate_reading/molecular_devices_spectramax_384_plus_backend.py +++ b/pylabrobot/plate_reading/molecular_devices/spectramax_384_plus_backend.py @@ -2,7 +2,7 @@ from pylabrobot.resources.plate import Plate -from .molecular_devices_backend import ( +from .backend import ( Calibrate, CarriageSpeed, KineticSettings, diff --git a/pylabrobot/plate_reading/molecular_devices_spectramax_m5_backend.py b/pylabrobot/plate_reading/molecular_devices/spectramax_m5_backend.py similarity index 76% rename from pylabrobot/plate_reading/molecular_devices_spectramax_m5_backend.py rename to pylabrobot/plate_reading/molecular_devices/spectramax_m5_backend.py index f781b3b4640..7628fbbf80d 100644 --- a/pylabrobot/plate_reading/molecular_devices_spectramax_m5_backend.py +++ b/pylabrobot/plate_reading/molecular_devices/spectramax_m5_backend.py @@ -1,4 +1,4 @@ -from .molecular_devices_backend import MolecularDevicesBackend +from .backend import MolecularDevicesBackend class MolecularDevicesSpectraMaxM5Backend(MolecularDevicesBackend): diff --git a/pylabrobot/plate_reading/molecular_devices_backend.py b/pylabrobot/plate_reading/molecular_devices_backend.py index ca94a084a71..a27f3d0263f 100644 --- a/pylabrobot/plate_reading/molecular_devices_backend.py +++ b/pylabrobot/plate_reading/molecular_devices_backend.py @@ -1,983 +1,11 @@ -import asyncio -import logging -import re -import time -from abc import ABCMeta -from dataclasses import dataclass, field -from enum import Enum -from typing import Dict, List, Literal, Optional, Tuple, Union +import warnings -from pylabrobot.io.serial import Serial -from pylabrobot.plate_reading.backend import PlateReaderBackend -from pylabrobot.resources.plate import Plate +from .molecular_devices.backend import ( # noqa: F401s + MolecularDevicesBackend, + MolecularDevicesSettings, +) -logger = logging.getLogger("pylabrobot") - -RES_TERM_CHAR = b">" -COMMAND_TERMINATORS: Dict[str, int] = { - "!AUTOFILTER": 1, - "!AUTOPMT": 1, - "!BAUD": 1, - "!CALIBRATE": 1, - "!CANCEL": 1, - "!CLEAR": 1, - "!CLOSE": 1, - "!CSPEED": 1, - "!REFERENCE": 1, - "!EMFILTER": 1, - "!EMWAVELENGTH": 1, - "!ERROR": 2, - "!EXWAVELENGTH": 1, - "!FPW": 1, - "!INIT": 1, - "!MODE": 1, - "!NVRAM": 1, - "!OPEN": 1, - "!ORDER": 1, - "OPTION": 2, - "!AIR_CAL": 1, - "!PMT": 1, - "!PMTCAL": 1, - "!QUEUE": 2, - "!READ": 1, - "!TOP": 1, - "!READSTAGE": 2, - "!READTYPE": 2, - "!RESEND": 1, - "!RESET": 1, - "!SHAKE": 1, - "!SPEED": 2, - "!STATUS": 2, - "!STRIP": 1, - "!TAG": 1, - "!TEMP": 2, - "!TRANSFER": 2, - "!USER_NUMBER": 2, - "!XPOS": 1, - "!YPOS": 1, - "!WAVELENGTH": 1, - "!WELLSCANMODE": 2, - "!PATHCAL": 2, - "!COUNTTIME": 1, - "!COUNTTIMEDELAY": 1, -} - - -class MolecularDevicesError(Exception): - """Exceptions raised by a Molecular Devices plate reader.""" - - -class MolecularDevicesUnrecognizedCommandError(MolecularDevicesError): - """Unrecognized command errors sent from the computer.""" - - -class MolecularDevicesFirmwareError(MolecularDevicesError): - """Firmware errors.""" - - -class MolecularDevicesHardwareError(MolecularDevicesError): - """Hardware errors.""" - - -class MolecularDevicesMotionError(MolecularDevicesError): - """Motion errors.""" - - -class MolecularDevicesNVRAMError(MolecularDevicesError): - """NVRAM errors.""" - - -ERROR_CODES: Dict[int, Tuple[str, type]] = { - 100: ("command not found", MolecularDevicesUnrecognizedCommandError), - 101: ("invalid argument", MolecularDevicesUnrecognizedCommandError), - 102: ("too many arguments", MolecularDevicesUnrecognizedCommandError), - 103: ("not enough arguments", MolecularDevicesUnrecognizedCommandError), - 104: ("input line too long", MolecularDevicesUnrecognizedCommandError), - 105: ("command invalid, system busy", MolecularDevicesUnrecognizedCommandError), - 106: ("command invalid, measurement in progress", MolecularDevicesUnrecognizedCommandError), - 107: ("no data to transfer", MolecularDevicesUnrecognizedCommandError), - 108: ("data buffer full", MolecularDevicesUnrecognizedCommandError), - 109: ("error buffer overflow", MolecularDevicesUnrecognizedCommandError), - 110: ("stray light cuvette, door open?", MolecularDevicesUnrecognizedCommandError), - 111: ("invalid read settings", MolecularDevicesUnrecognizedCommandError), - 200: ("assert failed", MolecularDevicesFirmwareError), - 201: ("bad error number", MolecularDevicesFirmwareError), - 202: ("receive queue overflow", MolecularDevicesFirmwareError), - 203: ("serial port parity error", MolecularDevicesFirmwareError), - 204: ("serial port overrun error", MolecularDevicesFirmwareError), - 205: ("serial port framing error", MolecularDevicesFirmwareError), - 206: ("cmd generated too much output", MolecularDevicesFirmwareError), - 207: ("fatal trap", MolecularDevicesFirmwareError), - 208: ("RTOS error", MolecularDevicesFirmwareError), - 209: ("stack overflow", MolecularDevicesFirmwareError), - 210: ("unknown interrupt", MolecularDevicesFirmwareError), - 300: ("thermistor faulty", MolecularDevicesHardwareError), - 301: ("safe temperature limit exceeded", MolecularDevicesHardwareError), - 302: ("low light", MolecularDevicesHardwareError), - 303: ("unable to cal dark current", MolecularDevicesHardwareError), - 304: ("signal level saturation", MolecularDevicesHardwareError), - 305: ("reference level saturation", MolecularDevicesHardwareError), - 306: ("plate air cal fail, low light", MolecularDevicesHardwareError), - 307: ("cuv air ref fail", MolecularDevicesHardwareError), - 308: ("stray light", MolecularDevicesHardwareError), - 312: ("gain calibration failed", MolecularDevicesHardwareError), - 313: ("reference gain check fail", MolecularDevicesHardwareError), - 314: ("low lamp level warning", MolecularDevicesHardwareError), - 315: ("can't find zero order", MolecularDevicesHardwareError), - 316: ("grating motor driver faulty", MolecularDevicesHardwareError), - 317: ("monitor ADC faulty", MolecularDevicesHardwareError), - 400: ("carriage motion error", MolecularDevicesMotionError), - 401: ("filter wheel error", MolecularDevicesMotionError), - 402: ("grating error", MolecularDevicesMotionError), - 403: ("stage error", MolecularDevicesMotionError), - 500: ("NVRAM CRC corrupt", MolecularDevicesNVRAMError), - 501: ("NVRAM Grating cal data bad", MolecularDevicesNVRAMError), - 502: ("NVRAM Cuvette air cal data error", MolecularDevicesNVRAMError), - 503: ("NVRAM Plate air cal data error", MolecularDevicesNVRAMError), - 504: ("NVRAM Carriage offset error", MolecularDevicesNVRAMError), - 505: ("NVRAM Stage offset error", MolecularDevicesNVRAMError), -} - - -MolecularDevicesResponse = List[str] - - -class ReadMode(Enum): - """The read mode of the plate reader (e.g., Absorbance, Fluorescence).""" - - ABS = "ABS" - FLU = "FLU" - LUM = "LUM" - POLAR = "POLAR" - TIME = "TIME" - - -class ReadType(Enum): - """The type of read to perform (e.g., Endpoint, Kinetic).""" - - ENDPOINT = "ENDPOINT" - KINETIC = "KINETIC" - SPECTRUM = "SPECTRUM" - WELL_SCAN = "WELLSCAN" - - -class ReadOrder(Enum): - """The order in which to read the plate wells.""" - - COLUMN = "COLUMN" - WAVELENGTH = "WAVELENGTH" - - -class Calibrate(Enum): - """The calibration mode for the read.""" - - ON = "ON" - ONCE = "ONCE" - OFF = "OFF" - - -class CarriageSpeed(Enum): - """The speed of the plate carriage.""" - - NORMAL = "8" - SLOW = "1" - - -class PmtGain(Enum): - """The photomultiplier tube gain setting.""" - - AUTO = "ON" - HIGH = "HIGH" - MEDIUM = "MED" - LOW = "LOW" - - -@dataclass -class ShakeSettings: - """Settings for shaking the plate during a read.""" - - before_read: bool = False - before_read_duration: int = 0 - between_reads: bool = False - between_reads_duration: int = 0 - - -@dataclass -class KineticSettings: - """Settings for kinetic reads.""" - - interval: int - num_readings: int - - -@dataclass -class SpectrumSettings: - """Settings for spectrum reads.""" - - start_wavelength: int - step: int - num_steps: int - excitation_emission_type: Optional[Literal["EXSPECTRUM", "EMSPECTRUM"]] = None - - -@dataclass -class MolecularDevicesSettings: - """A comprehensive, internal container for all plate reader settings.""" - - plate: Plate = field(repr=False) - read_mode: ReadMode - read_type: ReadType - read_order: ReadOrder - calibrate: Calibrate - shake_settings: Optional[ShakeSettings] - carriage_speed: CarriageSpeed - speed_read: bool - kinetic_settings: Optional[KineticSettings] - spectrum_settings: Optional[SpectrumSettings] - wavelengths: List[Union[int, Tuple[int, bool]]] = field(default_factory=list) - excitation_wavelengths: List[int] = field(default_factory=list) - emission_wavelengths: List[int] = field(default_factory=list) - cutoff_filters: List[int] = field(default_factory=list) - path_check: bool = False - read_from_bottom: bool = False - pmt_gain: Union[PmtGain, int] = PmtGain.AUTO - flashes_per_well: int = 1 - cuvette: bool = False - settling_time: int = 0 - - -class MolecularDevicesBackend(PlateReaderBackend, metaclass=ABCMeta): - """Backend for Molecular Devices plate readers.""" - - def __init__(self, port: str) -> None: - self.port = port - self.io = Serial(self.port, baudrate=9600, timeout=0.2) - - async def setup(self) -> None: - await self.io.setup() - await self.send_command("!") - - async def stop(self) -> None: - await self.io.stop() - - def serialize(self) -> dict: - return {**super().serialize(), "port": self.port} - - async def send_command( - self, command: str, timeout: int = 60, num_res_fields=None - ) -> MolecularDevicesResponse: - """Send a command and receive the response, automatically determining the number of - response fields. - """ - base_command = command.split(" ")[0] - if num_res_fields is None: - num_res_fields = COMMAND_TERMINATORS.get(base_command, 1) - else: - num_res_fields = max(1, num_res_fields) - - await self.io.write(command.encode() + b"\r") - raw_response = b"" - timeout_time = time.time() + timeout - while True: - raw_response += await self.io.readline() - await asyncio.sleep(0.001) - if time.time() > timeout_time: - raise TimeoutError(f"Timeout waiting for response to command: {command}") - if raw_response.count(RES_TERM_CHAR) >= num_res_fields: - break - logger.debug("[plate reader] Command: %s, Response: %s", command, raw_response) - response = raw_response.decode("utf-8").strip().split(RES_TERM_CHAR.decode()) - response = [r.strip() for r in response if r.strip() != ""] - self._parse_basic_errors(response, command) - return response - - def _parse_basic_errors(self, response: List[str], command: str) -> None: - if not response: - raise MolecularDevicesError(f"Command '{command}' failed with empty response.") - - # Check for FAIL in the response - error_code_msg = response[0] if "FAIL" in response[0] else response[-1] - if "FAIL" in error_code_msg: - parts = error_code_msg.split("\t") - try: - error_code_str = parts[-1] - error_code = int(error_code_str.strip()) - if error_code in ERROR_CODES: - message, err_class = ERROR_CODES[error_code] - raise err_class(f"Command '{command}' failed with error {error_code}: {message}") - raise MolecularDevicesError( - f"Command '{command}' failed with unknown error code: {error_code}" - ) - except (ValueError, IndexError): - raise MolecularDevicesError( - f"Command '{command}' failed with unparsable error: {response[0]}" - ) - - if "OK" not in response[0]: - raise MolecularDevicesError(f"Command '{command}' failed with response: {response}") - if "warning" in response[0].lower(): - logger.warning("Warning for command '%s': %s", command, response) - - async def open(self) -> None: - await self.send_command("!OPEN") - - async def close(self, plate: Optional[Plate] = None) -> None: - await self.send_command("!CLOSE") - - async def get_status(self) -> List[str]: - res = await self.send_command("!STATUS") - return res[1].split() - - async def read_error_log(self) -> str: - res = await self.send_command("!ERROR") - return res[1] - - async def clear_error_log(self) -> None: - await self.send_command("!CLEAR ERROR") - - async def get_temperature(self) -> Tuple[float, float]: - res = await self.send_command("!TEMP") - parts = res[1].split() - return (float(parts[1]), float(parts[0])) # current, set_point - - async def set_temperature(self, temperature: float) -> None: - if not (0 <= temperature <= 45): - raise ValueError("Temperature must be between 0 and 45°C.") - await self.send_command(f"!TEMP {temperature}") - - async def get_firmware_version(self) -> List[str]: - res = await self.send_command("!OPTION") - return res[1].split() - - async def start_shake(self) -> None: - await self.send_command("!SHAKE NOW") - - async def stop_shake(self) -> None: - await self.send_command("!SHAKE STOP") - - async def _read_now(self) -> None: - await self.send_command("!READ") - - async def _transfer_data(self, settings: MolecularDevicesSettings) -> List[Dict]: - """Transfer data from the plate reader. For kinetic/spectrum reads, this will transfer data for each - reading and combine them into a single collection. - """ - - if (settings.read_type == ReadType.KINETIC and settings.kinetic_settings) or ( - settings.read_type == ReadType.SPECTRUM and settings.spectrum_settings - ): - if settings.kinetic_settings: - num_readings = settings.kinetic_settings.num_readings - elif settings.spectrum_settings: - num_readings = settings.spectrum_settings.num_steps - else: - raise ValueError("Kinetic or Spectrum settings must be provided for this read type.") - - all_reads = [] - for _ in range(num_readings): - res = await self.send_command("!TRANSFER") - data_str = res[1] - read_data = self._parse_data(data_str, settings) - all_reads.extend(read_data) # Unpack the list - return all_reads - - # For ENDPOINT - res = await self.send_command("!TRANSFER") - data_str = res[1] - return self._parse_data(data_str, settings) - - def _parse_data(self, data_str: str, settings: MolecularDevicesSettings) -> List[Dict]: - lines = re.split(r"\r\n|\n", data_str.strip()) - lines = [line.strip() for line in lines if line.strip()] - - # 1. Parse header - header_parts = lines[0].split("\t") - measurement_time = float(header_parts[0]) - temperature = float(header_parts[1]) - - # 2. Parse wavelengths - line_idx = 1 - while line_idx < len(lines): - line = lines[line_idx] - if line.startswith("L:") and line_idx > 1: - # Data section started - break - line_idx += 1 - - data_collection = [] - cur_read_wavelengths = [] - # 3. Parse data - data_columns: List[List[float]] = [] - # The data section starts at line_idx - for i in range(line_idx, len(lines)): - line = lines[i] - if line.startswith("L:"): - # start of a new data with different wavelength - cur_read_wavelengths.append(line.split("\t")[1:]) - if i > line_idx and data_columns: - data_collection.append(data_columns) - data_columns = [] - match = re.match(r"^\s*(\d+):\s*(.*)", line) - if match: - values_str = re.split(r"\s+", match.group(2).strip()) - values = [] - for v in values_str: - if v.strip().replace(".", "", 1).isdigit(): - values.append(float(v.strip())) - elif v.strip() == "#SAT": - values.append(float("inf")) - else: - values.append(float("nan")) - data_columns.append(values) - if data_columns: - data_collection.append(data_columns) - - # 4. Transpose data to be row-major - data_collection_transposed = [] - for data_columns in data_collection: - data_rows = [] - if data_columns: - num_rows = len(data_columns[0]) - num_cols = len(data_columns) - for i in range(num_rows): - row = [data_columns[j][i] for j in range(num_cols)] - data_rows.append(row) - data_collection_transposed.append(data_rows) - - measurements = [] - read_mode = settings.read_mode - for i, data_rows in enumerate(data_collection_transposed): - measurement = { - "data": data_rows, - "temperature": temperature, - "time": measurement_time, - } - if read_mode == ReadMode.ABS: - wl = int(cur_read_wavelengths[i][0]) - measurement["wavelength"] = wl - elif read_mode == ReadMode.FLU or read_mode == ReadMode.POLAR or read_mode == ReadMode.TIME: - ex_wl = int(cur_read_wavelengths[i][0]) - em_wl = int(cur_read_wavelengths[i][1]) - measurement["ex_wavelength"] = ex_wl - measurement["em_wavelength"] = em_wl - elif read_mode == ReadMode.LUM: - em_wl = int(cur_read_wavelengths[i][1]) - measurement["em_wavelength"] = em_wl - measurements.append(measurement) - - return measurements - - async def _set_clear(self) -> None: - await self.send_command("!CLEAR DATA") - - async def _set_mode(self, settings: MolecularDevicesSettings) -> None: - cmd = f"!MODE {settings.read_type.value}" - if settings.read_type == ReadType.KINETIC and settings.kinetic_settings: - ks = settings.kinetic_settings - cmd += f" {ks.interval} {ks.num_readings}" - elif settings.read_type == ReadType.SPECTRUM and settings.spectrum_settings: - ss = settings.spectrum_settings - cmd = "!MODE" - scan_type = ss.excitation_emission_type or "SPECTRUM" - cmd += f" {scan_type} {ss.start_wavelength} {ss.step} {ss.num_steps}" - await self.send_command(cmd) - - async def _set_wavelengths(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode == ReadMode.ABS: - wl_parts = [] - for wl in settings.wavelengths: - wl_parts.append(f"F{wl[0]}" if isinstance(wl, tuple) and wl[1] else str(wl)) - wl_str = " ".join(wl_parts) - if settings.path_check: - wl_str += " 900 998" - await self.send_command(f"!WAVELENGTH {wl_str}") - elif settings.read_mode in (ReadMode.FLU, ReadMode.POLAR, ReadMode.TIME): - ex_wl_str = " ".join(map(str, settings.excitation_wavelengths)) - em_wl_str = " ".join(map(str, settings.emission_wavelengths)) - await self.send_command(f"!EXWAVELENGTH {ex_wl_str}") - await self.send_command(f"!EMWAVELENGTH {em_wl_str}") - elif settings.read_mode == ReadMode.LUM: - wl_str = " ".join(map(str, settings.emission_wavelengths)) - await self.send_command(f"!EMWAVELENGTH {wl_str}") - else: - raise NotImplementedError("f{settings.read_mode} not supported") - - async def _set_plate_position(self, settings: MolecularDevicesSettings) -> None: - plate = settings.plate - num_cols, num_rows, size_y = plate.num_items_x, plate.num_items_y, plate.get_size_y() - if num_cols < 2 or num_rows < 2: - raise ValueError("Plate must have at least 2 rows and 2 columns to calculate well spacing.") - top_left_well = plate.get_item(0) - if top_left_well.location is None: - raise ValueError("Top left well location is not set.") - top_left_well_center = top_left_well.location + top_left_well.get_anchor(x="c", y="c") - loc_A1 = plate.get_item("A1").location - loc_A2 = plate.get_item("A2").location - loc_B1 = plate.get_item("B1").location - if loc_A1 is None or loc_A2 is None or loc_B1 is None: - raise ValueError("Well locations for A1, A2, or B1 are not set.") - dx = loc_A2.x - loc_A1.x - dy = loc_A1.y - loc_B1.y - - x_pos_cmd = f"!XPOS {top_left_well_center.x:.3f} {dx:.3f} {num_cols}" - y_pos_cmd = f"!YPOS {size_y - top_left_well_center.y:.3f} {dy:.3f} {num_rows}" - await self.send_command(x_pos_cmd) - await self.send_command(y_pos_cmd) - - async def _set_strip(self, settings: MolecularDevicesSettings) -> None: - await self.send_command(f"!STRIP 1 {settings.plate.num_items_x}") - - async def _set_shake(self, settings: MolecularDevicesSettings) -> None: - if not settings.shake_settings: - await self.send_command("!SHAKE OFF") - return - ss = settings.shake_settings - shake_mode = "ON" if ss.before_read or ss.between_reads else "OFF" - before_duration = ss.before_read_duration if ss.before_read else 0 - ki = settings.kinetic_settings.interval if settings.kinetic_settings else 0 - if ss.between_reads and ki > 0: - between_duration = ss.between_reads_duration - wait_duration = ki - between_duration - else: - between_duration = 0 - wait_duration = 0 - await self.send_command(f"!SHAKE {shake_mode}") - await self.send_command(f"!SHAKE {before_duration} {ki} {wait_duration} {between_duration} 0") - - async def _set_carriage_speed(self, settings: MolecularDevicesSettings) -> None: - await self.send_command(f"!CSPEED {settings.carriage_speed.value}") - - async def _set_read_stage(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): - stage = "BOT" if settings.read_from_bottom else "TOP" - await self.send_command(f"!READSTAGE {stage}") - - async def _set_flashes_per_well(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): - await self.send_command(f"!FPW {settings.flashes_per_well}") - - async def _set_pmt(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode not in (ReadMode.FLU, ReadMode.LUM, ReadMode.POLAR, ReadMode.TIME): - return - gain = settings.pmt_gain - if gain == PmtGain.AUTO: - await self.send_command("!AUTOPMT ON") - else: - gain_val = gain.value if isinstance(gain, PmtGain) else gain - await self.send_command("!AUTOPMT OFF") - await self.send_command(f"!PMT {gain_val}") - - async def _set_filter(self, settings: MolecularDevicesSettings) -> None: - if ( - settings.read_mode in (ReadMode.FLU, ReadMode.POLAR, ReadMode.TIME) - and settings.cutoff_filters - ): - cf_str = " ".join(map(str, settings.cutoff_filters)) - await self.send_command("!AUTOFILTER OFF") - await self.send_command(f"!EMFILTER {cf_str}") - else: - await self.send_command("!AUTOFILTER ON") - - async def _set_calibrate(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode == ReadMode.ABS: - await self.send_command(f"!CALIBRATE {settings.calibrate.value}") - else: - await self.send_command(f"!PMTCAL {settings.calibrate.value}") - - async def _set_order(self, settings: MolecularDevicesSettings) -> None: - await self.send_command(f"!ORDER {settings.read_order.value}") - - async def _set_speed(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode == ReadMode.ABS: - mode = "ON" if settings.speed_read else "OFF" - await self.send_command(f"!SPEED {mode}") - - async def _set_nvram(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode == ReadMode.POLAR: - command = "FPSETTLETIME" - value = settings.settling_time - else: - command = "CARCOL" - value = settings.settling_time if settings.settling_time > 100 else 100 - await self.send_command(f"!NVRAM {command} {value}") - - async def _set_tag(self, settings: MolecularDevicesSettings) -> None: - if settings.read_mode == ReadMode.POLAR and settings.read_type == ReadType.KINETIC: - await self.send_command("!TAG ON") - else: - await self.send_command("!TAG OFF") - - async def _set_readtype(self, settings: MolecularDevicesSettings) -> None: - """Set the READTYPE command and the expected number of response fields.""" - cuvette = settings.cuvette - num_res_fields = COMMAND_TERMINATORS.get("!READTYPE", 2) - - if settings.read_mode == ReadMode.ABS: - cmd = f"!READTYPE ABS{'CUV' if cuvette else 'PLA'}" - elif settings.read_mode == ReadMode.FLU: - cmd = f"!READTYPE FLU{'CUV' if cuvette else ''}" - num_res_fields = 2 if cuvette else 1 - elif settings.read_mode == ReadMode.LUM: - cmd = f"!READTYPE LUM{'CUV' if cuvette else ''}" - num_res_fields = 2 if cuvette else 1 - elif settings.read_mode == ReadMode.POLAR: - cmd = "!READTYPE POLAR" - num_res_fields = 1 - elif settings.read_mode == ReadMode.TIME: - cmd = "!READTYPE TIME 0 250" - num_res_fields = 1 - else: - raise ValueError(f"Unsupported read mode: {settings.read_mode}") - - await self.send_command(cmd, num_res_fields=num_res_fields) - - async def _set_integration_time( - self, settings: MolecularDevicesSettings, delay_time: int, integration_time: int - ) -> None: - if settings.read_mode == ReadMode.TIME: - await self.send_command(f"!COUNTTIMEDELAY {delay_time}") - await self.send_command(f"!COUNTTIME {integration_time * 0.001}") - - def _get_cutoff_filter_index_from_wavelength(self, wavelength: int) -> int: - """Converts a wavelength to a cutoff filter index.""" - # This map is a direct translation of the `EmissionCutoff.CutoffFilter` in MaxlineModel.cs - # (min_wavelength, max_wavelength, cutoff_filter_index) - FILTERS = [ - (0, 322, 1), - (325, 415, 16), - (420, 435, 2), - (435, 455, 3), - (455, 475, 4), - (475, 495, 5), - (495, 515, 6), - (515, 530, 7), - (530, 550, 8), - (550, 570, 9), - (570, 590, 10), - (590, 610, 11), - (610, 630, 12), - (630, 665, 13), - (665, 695, 14), - (695, 900, 15), - ] - for min_wl, max_wl, cutoff_filter_index in FILTERS: - if min_wl <= wavelength < max_wl: - return cutoff_filter_index - raise ValueError(f"No cutoff filter found for wavelength {wavelength}") - - async def _wait_for_idle(self, timeout: int = 600): - """Wait for the plate reader to become idle.""" - start_time = time.time() - while True: - if time.time() - start_time > timeout: - raise TimeoutError("Timeout waiting for plate reader to become idle.") - status = await self.get_status() - if status and status[1] == "IDLE": - break - await asyncio.sleep(1) - - async def read_absorbance( # type: ignore[override] - self, - plate: Plate, - wavelengths: List[Union[int, Tuple[int, bool]]], - read_type: ReadType = ReadType.ENDPOINT, - read_order: ReadOrder = ReadOrder.COLUMN, - calibrate: Calibrate = Calibrate.ONCE, - shake_settings: Optional[ShakeSettings] = None, - carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, - speed_read: bool = False, - path_check: bool = False, - kinetic_settings: Optional[KineticSettings] = None, - spectrum_settings: Optional[SpectrumSettings] = None, - cuvette: bool = False, - settling_time: int = 0, - timeout: int = 600, - ) -> List[Dict]: - settings = MolecularDevicesSettings( - plate=plate, - read_mode=ReadMode.ABS, - read_type=read_type, - read_order=read_order, - calibrate=calibrate, - shake_settings=shake_settings, - carriage_speed=carriage_speed, - speed_read=speed_read, - path_check=path_check, - kinetic_settings=kinetic_settings, - spectrum_settings=spectrum_settings, - wavelengths=wavelengths, - cuvette=cuvette, - settling_time=settling_time, - ) - await self._set_clear() - if not cuvette: - await self._set_plate_position(settings) - await self._set_strip(settings) - await self._set_carriage_speed(settings) - - await self._set_shake(settings) - await self._set_wavelengths(settings) - await self._set_calibrate(settings) - await self._set_mode(settings) - await self._set_order(settings) - await self._set_speed(settings) - await self._set_tag(settings) - await self._set_nvram(settings) - await self._set_readtype(settings) - - await self._read_now() - await self._wait_for_idle(timeout=timeout) - return await self._transfer_data(settings) - - async def read_fluorescence( # type: ignore[override] - self, - plate: Plate, - excitation_wavelengths: List[int], - emission_wavelengths: List[int], - cutoff_filters: List[int], - read_type: ReadType = ReadType.ENDPOINT, - read_order: ReadOrder = ReadOrder.COLUMN, - calibrate: Calibrate = Calibrate.ONCE, - shake_settings: Optional[ShakeSettings] = None, - carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, - read_from_bottom: bool = False, - pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, - flashes_per_well: int = 10, - kinetic_settings: Optional[KineticSettings] = None, - spectrum_settings: Optional[SpectrumSettings] = None, - cuvette: bool = False, - settling_time: int = 0, - timeout: int = 600, - ) -> List[Dict]: - """use _get_cutoff_filter_index_from_wavelength for cutoff_filters""" - settings = MolecularDevicesSettings( - plate=plate, - read_mode=ReadMode.FLU, - read_type=read_type, - read_order=read_order, - calibrate=calibrate, - shake_settings=shake_settings, - carriage_speed=carriage_speed, - read_from_bottom=read_from_bottom, - pmt_gain=pmt_gain, - flashes_per_well=flashes_per_well, - kinetic_settings=kinetic_settings, - spectrum_settings=spectrum_settings, - excitation_wavelengths=excitation_wavelengths, - emission_wavelengths=emission_wavelengths, - cutoff_filters=cutoff_filters, - cuvette=cuvette, - speed_read=False, - settling_time=settling_time, - ) - await self._set_clear() - if not cuvette: - await self._set_plate_position(settings) - await self._set_strip(settings) - await self._set_carriage_speed(settings) - - await self._set_shake(settings) - await self._set_flashes_per_well(settings) - await self._set_pmt(settings) - await self._set_wavelengths(settings) - await self._set_filter(settings) - await self._set_read_stage(settings) - await self._set_calibrate(settings) - await self._set_mode(settings) - await self._set_order(settings) - await self._set_tag(settings) - await self._set_nvram(settings) - await self._set_readtype(settings) - - await self._read_now() - await self._wait_for_idle(timeout=timeout) - return await self._transfer_data(settings) - - async def read_luminescence( # type: ignore[override] - self, - plate: Plate, - emission_wavelengths: List[int], - read_type: ReadType = ReadType.ENDPOINT, - read_order: ReadOrder = ReadOrder.COLUMN, - calibrate: Calibrate = Calibrate.ONCE, - shake_settings: Optional[ShakeSettings] = None, - carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, - read_from_bottom: bool = False, - pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, - flashes_per_well: int = 0, - kinetic_settings: Optional[KineticSettings] = None, - spectrum_settings: Optional[SpectrumSettings] = None, - cuvette: bool = False, - settling_time: int = 0, - timeout: int = 600, - ) -> List[Dict]: - settings = MolecularDevicesSettings( - plate=plate, - read_mode=ReadMode.LUM, - read_type=read_type, - read_order=read_order, - calibrate=calibrate, - shake_settings=shake_settings, - carriage_speed=carriage_speed, - read_from_bottom=read_from_bottom, - pmt_gain=pmt_gain, - flashes_per_well=flashes_per_well, - kinetic_settings=kinetic_settings, - spectrum_settings=spectrum_settings, - emission_wavelengths=emission_wavelengths, - cuvette=cuvette, - speed_read=False, - settling_time=settling_time, - ) - await self._set_clear() - await self._set_read_stage(settings) - - if not cuvette: - await self._set_plate_position(settings) - await self._set_strip(settings) - await self._set_carriage_speed(settings) - - await self._set_shake(settings) - await self._set_pmt(settings) - await self._set_wavelengths(settings) - await self._set_read_stage(settings) - await self._set_calibrate(settings) - await self._set_mode(settings) - await self._set_order(settings) - await self._set_tag(settings) - await self._set_nvram(settings) - await self._set_readtype(settings) - - await self._read_now() - await self._wait_for_idle(timeout=timeout) - return await self._transfer_data(settings) - - async def read_fluorescence_polarization( - self, - plate: Plate, - excitation_wavelengths: List[int], - emission_wavelengths: List[int], - cutoff_filters: List[int], - read_type: ReadType = ReadType.ENDPOINT, - read_order: ReadOrder = ReadOrder.COLUMN, - calibrate: Calibrate = Calibrate.ONCE, - shake_settings: Optional[ShakeSettings] = None, - carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, - read_from_bottom: bool = False, - pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, - flashes_per_well: int = 10, - kinetic_settings: Optional[KineticSettings] = None, - spectrum_settings: Optional[SpectrumSettings] = None, - cuvette: bool = False, - settling_time: int = 0, - timeout: int = 600, - ) -> List[Dict]: - settings = MolecularDevicesSettings( - plate=plate, - read_mode=ReadMode.POLAR, - read_type=read_type, - read_order=read_order, - calibrate=calibrate, - shake_settings=shake_settings, - carriage_speed=carriage_speed, - read_from_bottom=read_from_bottom, - pmt_gain=pmt_gain, - flashes_per_well=flashes_per_well, - kinetic_settings=kinetic_settings, - spectrum_settings=spectrum_settings, - excitation_wavelengths=excitation_wavelengths, - emission_wavelengths=emission_wavelengths, - cutoff_filters=cutoff_filters, - cuvette=cuvette, - speed_read=False, - settling_time=settling_time, - ) - await self._set_clear() - if not cuvette: - await self._set_plate_position(settings) - await self._set_strip(settings) - await self._set_carriage_speed(settings) - - await self._set_shake(settings) - await self._set_flashes_per_well(settings) - await self._set_pmt(settings) - await self._set_wavelengths(settings) - await self._set_filter(settings) - await self._set_read_stage(settings) - await self._set_calibrate(settings) - await self._set_mode(settings) - await self._set_order(settings) - await self._set_tag(settings) - await self._set_nvram(settings) - await self._set_readtype(settings) - - await self._read_now() - await self._wait_for_idle(timeout=timeout) - return await self._transfer_data(settings) - - async def read_time_resolved_fluorescence( - self, - plate: Plate, - excitation_wavelengths: List[int], - emission_wavelengths: List[int], - cutoff_filters: List[int], - delay_time: int, - integration_time: int, - read_type: ReadType = ReadType.ENDPOINT, - read_order: ReadOrder = ReadOrder.COLUMN, - calibrate: Calibrate = Calibrate.ONCE, - shake_settings: Optional[ShakeSettings] = None, - carriage_speed: CarriageSpeed = CarriageSpeed.NORMAL, - read_from_bottom: bool = False, - pmt_gain: Union[PmtGain, int] = PmtGain.AUTO, - flashes_per_well: int = 50, - kinetic_settings: Optional[KineticSettings] = None, - spectrum_settings: Optional[SpectrumSettings] = None, - cuvette: bool = False, - settling_time: int = 0, - timeout: int = 600, - ) -> List[Dict]: - settings = MolecularDevicesSettings( - plate=plate, - read_mode=ReadMode.TIME, - read_type=read_type, - read_order=read_order, - calibrate=calibrate, - shake_settings=shake_settings, - carriage_speed=carriage_speed, - read_from_bottom=read_from_bottom, - pmt_gain=pmt_gain, - flashes_per_well=flashes_per_well, - kinetic_settings=kinetic_settings, - spectrum_settings=spectrum_settings, - excitation_wavelengths=excitation_wavelengths, - emission_wavelengths=emission_wavelengths, - cutoff_filters=cutoff_filters, - cuvette=cuvette, - speed_read=False, - settling_time=settling_time, - ) - await self._set_clear() - await self._set_readtype(settings) - await self._set_integration_time(settings, delay_time, integration_time) - - if not cuvette: - await self._set_plate_position(settings) - await self._set_strip(settings) - await self._set_carriage_speed(settings) - - await self._set_shake(settings) - await self._set_flashes_per_well(settings) - await self._set_pmt(settings) - await self._set_wavelengths(settings) - await self._set_filter(settings) - await self._set_calibrate(settings) - await self._set_read_stage(settings) - await self._set_mode(settings) - await self._set_order(settings) - await self._set_tag(settings) - await self._set_nvram(settings) - - await self._read_now() - await self._wait_for_idle(timeout=timeout) - return await self._transfer_data(settings) +warnings.warn( + "pylabrobot.plate_reading.molecular_devices_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.molecular_devices.molecular_devices_backend instead.", +) diff --git a/pylabrobot/plate_reading/spectramax_384_plus_backend.py b/pylabrobot/plate_reading/spectramax_384_plus_backend.py new file mode 100644 index 00000000000..b209792ed7b --- /dev/null +++ b/pylabrobot/plate_reading/spectramax_384_plus_backend.py @@ -0,0 +1,10 @@ +import warnings + +from .molecular_devices.spectramax_384_plus_backend import ( + MolecularDevicesSpectraMax384PlusBackend, # noqa: F401 +) + +warnings.warn( + "pylabrobot.plate_reading.spectramax_384_plus_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.molecular_devices.spectramax_384_plus_backend instead.", +) diff --git a/pylabrobot/plate_reading/spectramax_m5_backend.py b/pylabrobot/plate_reading/spectramax_m5_backend.py new file mode 100644 index 00000000000..b58cf75ae0a --- /dev/null +++ b/pylabrobot/plate_reading/spectramax_m5_backend.py @@ -0,0 +1,10 @@ +import warnings + +from .molecular_devices.spectramax_m5_backend import ( + MolecularDevicesSpectraMaxM5Backend, # noqa: F401 +) + +warnings.warn( + "pylabrobot.plate_reading.spectramax_m5_backend is deprecated and will be removed in a future release. " + "Please use pylabrobot.plate_reading.molecular_devices.spectramax_m5_backend instead.", +)