diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 521fa5adaac..c7a4ac0703d 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -17,6 +17,7 @@ Literal, Optional, Sequence, + Tuple, Type, TypeVar, Union, @@ -69,6 +70,7 @@ TipRack, TipSpot, Well, + standard_volume_tip_with_filter, ) from pylabrobot.resources.barcode import Barcode, Barcode1DSymbology from pylabrobot.resources.errors import ( @@ -1671,83 +1673,310 @@ async def probe_liquid_heights( self, containers: List[Container], use_channels: List[int], - tips: List[HamiltonTip], resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: Optional[List[LLDMode]] = None, + lld_search_height: Optional[List[float]] = None, + minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, + minimum_height: Optional[List[float]] = None, + min_z_endpos: Optional[float] = None, + traversal_height: Optional[float] = None, + post_detection_distance: float = 2.0, + swap_speed: Optional[List[float]] = None, + n_replicates: int = 1, + return_mean: bool = True, move_to_z_safety_after: bool = True, - ) -> List[float]: - """Probe liquid heights for the specified channels. + ) -> Union[List[float], List[Tuple[float, ...]]]: + """ + Probe liquid surface heights in one or more containers using Hamilton STAR + Liquid Level Detection (LLD). + + This method performs one or more zero-volume aspirate operations (LLD-only probe + moves) on the specified channels and records the detected liquid surface height. + Both capacitive (LLDMode=1) and pressure-based (LLDMode=2) detection modes are + supported. + + For each replicate, the absolute Z position reported by the STAR at the moment of + LLD detection is converted into a height relative to the container cavity bottom. + Negative heights are clamped to zero as a safety hedge against geometry definition + errors. + + Parameters + ---------- + containers: + Containers to probe. All containers must share the same X-coordinate for safety reasons. + use_channels: + STAR channels to use for probing. Must correspond one-to-one with `containers`. + resource_offsets: + Optional per-container XY offsets applied during probing. Defaults to zero offsets. + lld_mode: + Per-channel LLD mode (1 = capacitive, 2 = pressure-based). Defaults to cLLD. + lld_search_height: + Absolute Z height (deck coordinates) from which the LLD search begins. Defaults to + 5 mm above the container top. + post_detection_distance: + Distance (in mm) the tip is allowed to continue moving downward after LLD + detection. Defaults to 2 mm. + swap_speed: + Optional per-channel Z movement speed during probing. Defaults to 100 mm/s. + n_replicates: + Number of repeated LLD probe operations per channel. Defaults to 1. + return_mean: + If True, return the mean liquid height per channel. If False, return all + replicate heights per channel. Defaults to True. + move_to_z_safety_after: + If True, move all channels to Z safety after probing completes. Defaults to True. + + Returns + ------- + List[float] or List[Tuple[float, ...]] + If `return_mean` is True, returns mean liquid heights (mm) per channel. + Otherwise, returns per-channel tuples of replicate heights. + """ + + # Default offsets + resource_offsets = resource_offsets or [Coordinate.zero() for _ in range(len(containers))] + + # Default LLD mode == capacitive LLD + if lld_mode is None: + lld_mode = [self.LLDMode(1)] * len(containers) - Moves the channels to the x and y positions of the containers, then probes the liquid height - using the CLLD function. + assert ( + len(containers) == len(use_channels) == len(resource_offsets) == len(lld_mode) + ), f"{containers=}, {use_channels=}, {resource_offsets=}, and {lld_mode=} must be same length" - Returns the liquid height in each well in mm with respect to the bottom of the container cavity. - Returns `None` for channels where the liquid height could not be determined. - """ + # Validate individual modes + for mode in lld_mode: + assert mode in [ + self.LLDMode(1), + self.LLDMode(2), + ], f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {mode}" - if any(not resource.supports_compute_height_volume_functions() for resource in containers): - raise ValueError( - "automatic_surface_following can only be used with containers that support height<->volume functions." - ) + if lld_search_height is None: + lld_search_height = [ + c.get_location_wrt(self.deck, "c", "c", z="top").z + - c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z + + 5.0 # Default 5 mm above top of Container + for c in containers + ] - resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) + if minimum_traverse_height_at_beginning_of_a_command is None: + minimum_traverse_height_at_beginning_of_a_command = self._channel_traversal_height - assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips) + minimal_z_positions = [ + c.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z for c in containers + ] + if min_z_endpos is None: + min_z_endpos = min(minimal_z_positions) - await self.move_all_channels_in_z_safety() + if minimum_height is None: + minimum_height = minimal_z_positions - # Check if all channels are on the same x position, then move there - x_pos = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x - for resource, offset in zip(containers, resource_offsets) - ] - if len(set(x_pos)) > 1: - raise NotImplementedError( - "automatic_surface_following is not supported for multiple x positions." + if traversal_height is None: + traversal_height = self._channel_traversal_height + + if len(set(containers)) == 1: + resource_offsets = get_wide_single_resource_liquid_op_offsets( + resource=containers[0], num_channels=len(containers) ) - await self.move_channel_x(0, x_pos[0]) - # move channels to above their y positions - y_pos = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y - for resource, offset in zip(containers, resource_offsets) - ] - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_pos)} - ) - - # detect liquid heights - current_absolute_liquid_heights = await asyncio.gather( - *[ - self.move_z_drive_to_liquid_surface_using_clld( - channel_idx=channel, - lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z - + tip.total_tip_length - - tip.fitting_depth, - start_pos_search=container.get_absolute_location("c", "c", "t").z - + tip.total_tip_length - - tip.fitting_depth - + 5, + tip_presence_summary = await self.request_tip_presence() + + if not all(tip_presence_summary[ch] for ch in use_channels): + raise RuntimeError( + "All channels used for probing must have tips attached." + f" Tips present: {tip_presence_summary}, requested channels: {use_channels}" + ) + + # Create proxy aspirate operations + ops = [] + for i, c in enumerate(containers): + ops.append( + SingleChannelAspiration( + resource=c, + offset=resource_offsets[i], + tip=standard_volume_tip_with_filter(), + volume=0.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None, ) - for channel, container, tip in zip(use_channels, containers, tips) + ) + + if swap_speed is None: + swap_speed = [100.0] * len(containers) + + assert n_replicates > 0 + + replicate_summary = [] + + # TODO: merge x_chunking helper function of containers into PLR, then use it to smartly split + # containers to probe -> minimise back and forth movement in x dimension when containers are + # in different x coordinates + + for _ in range(n_replicates): + x_coords_of_ops = [c.get_location_wrt(self.deck, "c").x for c in containers] + if n_replicates > 1 and len(set(x_coords_of_ops)) > 1: + raise ValueError( + "probing is only allowed in the same x-coordinate for safety reasons." + f"given: {x_coords_of_ops}" + ) + + # Perform zero-volume aspirate operation for probing + await self.aspirate( + ops=ops, + use_channels=use_channels, + lld_mode=lld_mode, + lld_search_height=lld_search_height, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + immersion_depth=[-post_detection_distance] * len(containers), + minimum_height=minimum_height, + settling_time=[0] * len(containers), + pull_out_distance_transport_air=[0] * len(containers), + transport_air_volume=[0] * len(containers), + swap_speed=swap_speed, + min_z_endpos=min_z_endpos, + ) + + all_absolute_liquid_heights = await self.request_pip_height_last_lld() + + absolute_llds_filtered_to_used_channels = [ + all_absolute_liquid_heights[i] for i in use_channels ] - ) - current_absolute_liquid_heights = await self.request_pip_height_last_lld() # type: ignore + # Compute heights relative to cavity bottom + relative_liquid_height_to_well = [] + for abs_h, resource in zip(absolute_llds_filtered_to_used_channels, containers): + bottom_z = resource.get_location_wrt(self.deck, "c", "c", z="cavity_bottom").z + relative_height = abs_h - bottom_z + # Hedge against definition mistakes (cavity bottom lower than expected) + relative_liquid_height_to_well.append(relative_height if relative_height >= 0 else 0.0) - filtered_absolute_liquid_heights = [ - current_absolute_liquid_heights[idx] for idx in use_channels - ] + # Move to specified traversal height to save time + zs = {ch: traversal_height for ch in use_channels} + await self.position_channels_in_z_direction(zs) - relative_to_well = [ - filtered_absolute_liquid_heights[i] - - resource.get_absolute_location("c", "c", "cavity_bottom").z - for i, resource in enumerate(containers) - ] + replicate_summary.append(relative_liquid_height_to_well) + + merged_channel_results = list(zip(*replicate_summary)) if move_to_z_safety_after: await self.move_all_channels_in_z_safety() - return relative_to_well + if return_mean: + return [ + round(sum(channel_results) / n_replicates, 2) for channel_results in merged_channel_results + ] + + else: + return merged_channel_results + + async def probe_liquid_volumes( + self, + containers: List[Container], + use_channels: List[int], + resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: Optional[List[LLDMode]] = None, + lld_search_height: Optional[List[float]] = None, + minimum_traverse_height_at_beginning_of_a_command: Optional[float] = None, + min_z_endpos: Optional[float] = None, + traversal_height: Optional[float] = None, + post_detection_distance: float = 2.0, + swap_speed: Optional[List[float]] = None, + n_replicates: int = 3, + return_mean: bool = True, + move_to_z_safety_after: bool = True, + ) -> Union[List[float], List[Tuple[float, ...]]]: + """ + Probe liquid volumes in one or more containers using Hamilton STAR Liquid Level + Detection (LLD). + + This method performs repeated LLD-only probe operations to measure the liquid + surface height in each container and converts the measured heights into liquid + volumes using each container's geometric model + (`Container.compute_volume_from_height`). + + Only containers that support height-to-volume conversion can be used with this + method. All probing motion, safety constraints, and replicate handling are + delegated to `probe_liquid_heights`. + + Parameters + ---------- + containers: + Containers to probe. All containers must support height-to-volume conversion. + use_channels: + STAR channels to use for probing. Must correspond one-to-one with + `containers`. + resource_offsets: + Optional per-container XY offsets applied during probing. + lld_mode: + Per-channel LLD mode (1 = capacitive, 2 = pressure-based). + lld_search_height: + Absolute Z height (deck coordinates) from which the LLD search begins. + post_detection_distance: + Distance (in mm) the tip is allowed to continue moving downward after LLD + detection. + swap_speed: + Optional per-channel Z movement speed during probing. + n_replicates: + Number of repeated LLD probe operations per channel. + return_mean: + If True, return the mean liquid volume per channel. If False, return all + replicate volumes per channel. + move_to_z_safety_after: + If True, move all channels to Z safety after probing completes. + + Returns + ------- + List[float] or List[Tuple[float, ...]] + If `return_mean` is True, returns mean liquid volumes per channel. + Otherwise, returns per-channel tuples of replicate volumes. + """ + + # Validate that containers support height<->volume functions + for resource in containers: + try: + resource.compute_volume_from_height(1.0) + except NotImplementedError as e: + raise ValueError( + "probe_liquid_volumes can only be used with containers " + "that support height<->volume functions." + ) from e + + # First, probe liquid heights + merged_channel_results_liquid_heights = cast( + List[Tuple[float, ...]], + await self.probe_liquid_heights( + containers=containers, + use_channels=use_channels, + resource_offsets=resource_offsets, + lld_mode=lld_mode, + lld_search_height=lld_search_height, + minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command, + min_z_endpos=min_z_endpos, + traversal_height=traversal_height, + post_detection_distance=post_detection_distance, + swap_speed=swap_speed, + n_replicates=n_replicates, + return_mean=False, + move_to_z_safety_after=move_to_z_safety_after, + ), + ) + + merged_channel_results_volumes = [] + for channel_results in merged_channel_results_liquid_heights: + computed_volumes = [ + resource.compute_volume_from_height(h) for resource, h in zip(containers, channel_results) + ] + merged_channel_results_volumes.append(computed_volumes) + + if return_mean: + return [ + round(sum(channel_results) / n_replicates, 2) + for channel_results in merged_channel_results_volumes + ] + + return merged_channel_results_volumes async def aspirate( self, @@ -2000,12 +2229,18 @@ async def aspirate( if any(op.liquid_height is not None for op in ops): raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + if len(set(x_positions)) > 1: + raise ValueError( + "probe_liquid_height can only be used when all operations are in the same x position." + ) + liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, - tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + minimum_height=minimum_height, + min_z_endpos=minimum_height, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -2370,7 +2605,6 @@ async def dispense( liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, - tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, )