Add ObjectTracker module with DepthAI integration and software tracker#1
Add ObjectTracker module with DepthAI integration and software tracker#1
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces an ObjectTracker module that provides persistent object tracking across frames with two implementation modes: on-device tracking using DepthAI's ObjectTracker node, and host-side software tracking using IoU-based matching. The module accepts Detection objects and outputs TrackedObject instances with persistent IDs, tracking status, and smoothed spatial coordinates.
Changes:
- Implements data classes for Detection (input) and TrackedObject (output) with tracking status enum
- Adds on-device tracker integration with DepthAI pipeline configuration and tracklet parsing
- Implements software-based IoU tracker with exponential moving average coordinate smoothing
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/object_tracker/init.py | Exports public API including TrackedObject, Detection, tracker functions, and worker functions |
| modules/object_tracker/tracked_object.py | Defines TrackedObject dataclass and TrackingStatus enum for output representation |
| modules/object_tracker/detection.py | Defines Detection dataclass as input interface from detection team |
| modules/object_tracker/object_tracker.py | Implements DepthAI ObjectTracker node configuration and tracklet parsing logic |
| modules/object_tracker/object_tracker_worker.py | Provides worker functions for on-device tracker pipeline integration |
| modules/object_tracker/software_tracker.py | Implements host-side IoU-based tracker with ID persistence and coordinate smoothing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _find_best_match(self, det: Detection) -> Optional[int]: | ||
| """Find the track with highest IoU above threshold.""" | ||
| best_id = None | ||
| best_iou = self._iou_threshold | ||
|
|
||
| for track_id, track in self._tracks.items(): | ||
| iou = self._compute_iou( | ||
| det.xmin, det.ymin, det.xmax, det.ymax, | ||
| track.xmin, track.ymin, track.xmax, track.ymax, | ||
| ) | ||
| if iou > best_iou: | ||
| best_iou = iou | ||
| best_id = track_id | ||
|
|
||
| return best_id |
There was a problem hiding this comment.
The IoU-based matching doesn't consider object labels, which could cause incorrect associations when objects of different classes overlap. For example, if a "person" detection and a "car" detection have high IoU, they might be incorrectly matched. Consider adding a label matching constraint: only match detections to tracks with the same label.
| def object_tracker_run( | ||
| pipeline: dai.Pipeline, | ||
| spatial_detection_network: dai.node.SpatialDetectionNetwork, | ||
| label_map: List[str], | ||
| frame_width: int, | ||
| frame_height: int, | ||
| output_queue, # multiprocessing.Queue[List[TrackedObject]] | ||
| tracker_type: str = "SHORT_TERM_IMAGELESS", | ||
| labels_to_track: List[int] = None, | ||
| ) -> None: |
There was a problem hiding this comment.
The function name object_tracker_run is misleading - it only configures the tracker node but doesn't run anything. The actual read loop is in a separate function object_tracker_read_loop. Consider renaming to configure_object_tracker or setup_object_tracker to better reflect its behavior, or combine both functions if they're always used together.
| while True: | ||
| tracklets_data = tracklet_queue.get() # blocks until next frame | ||
|
|
||
| tracked_objects = parse_tracklets( | ||
| tracklets_data=tracklets_data, | ||
| label_map=label_map, | ||
| frame_width=frame_width, | ||
| frame_height=frame_height, | ||
| ) | ||
|
|
||
| if tracked_objects: | ||
| logger.debug( | ||
| "Frame produced %d tracked objects: %s", | ||
| len(tracked_objects), | ||
| [ | ||
| f"id={t.object_id} status={t.status.value}" | ||
| for t in tracked_objects | ||
| ], | ||
| ) | ||
|
|
||
| output_queue.put(tracked_objects) |
There was a problem hiding this comment.
The infinite loop in object_tracker_read_loop has no error handling or exit mechanism. If an exception occurs (e.g., device disconnection, queue errors), the worker will crash without cleanup. Consider adding try-except blocks and a mechanism to gracefully exit the loop (e.g., checking a stop event or catching specific exceptions).
| class SoftwareTracker: | ||
| """ | ||
| Host-side object tracker with ID assignment and coordinate smoothing. | ||
|
|
||
| Accepts Detection objects from the detection team and outputs | ||
| TrackedObject with persistent IDs and smoothed coordinates. | ||
|
|
||
| Usage:: | ||
|
|
||
| tracker = SoftwareTracker() | ||
|
|
||
| # Each frame: | ||
| detections = [Detection(...), Detection(...)] | ||
| tracked = tracker.update(detections) | ||
| # tracked is List[TrackedObject] | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| iou_threshold: float = 0.3, | ||
| max_lost_frames: float = 0.5, # seconds | ||
| smoothing_alpha: float = 0.4, | ||
| ) -> None: | ||
| """ | ||
| Args: | ||
| iou_threshold: Min IoU to match detection to existing track. | ||
| max_lost_frames: Seconds before a lost object is removed. | ||
| smoothing_alpha: EMA alpha for coordinate smoothing (0-1). | ||
| Higher = more weight on new detections. | ||
| """ | ||
| self._iou_threshold = iou_threshold | ||
| self._max_lost_time = max_lost_frames | ||
| self._alpha = smoothing_alpha | ||
| self._next_id = 1 | ||
| self._tracks: Dict[int, _TrackedState] = {} | ||
|
|
||
| def update(self, detections: List[Detection]) -> List[TrackedObject]: | ||
| """ | ||
| Process a new frame of detections. | ||
|
|
||
| Args: | ||
| detections: List of Detection objects from this frame. | ||
|
|
||
| Returns: | ||
| List of TrackedObject with persistent IDs and smoothed coords. | ||
| """ | ||
| now = time.time() | ||
| matched_track_ids = set() | ||
| results: List[TrackedObject] = [] | ||
|
|
||
| # --- match detections to existing tracks --- | ||
| for det in detections: | ||
| best_id = self._find_best_match(det) | ||
|
|
||
| if best_id is not None: | ||
| # update existing track | ||
| track = self._tracks[best_id] | ||
| self._update_track(track, det, now) | ||
| matched_track_ids.add(best_id) | ||
| else: | ||
| # create new track | ||
| new_id = self._next_id | ||
| self._next_id += 1 | ||
| self._tracks[new_id] = _TrackedState( | ||
| object_id=new_id, | ||
| label=det.label, | ||
| confidence=det.confidence, | ||
| x=det.x, | ||
| y=det.y, | ||
| z=det.z, | ||
| xmin=det.xmin, | ||
| ymin=det.ymin, | ||
| xmax=det.xmax, | ||
| ymax=det.ymax, | ||
| last_seen=now, | ||
| status=TrackingStatus.NEW, | ||
| ) | ||
| matched_track_ids.add(new_id) | ||
|
|
||
| # --- mark unmatched tracks as LOST, remove old ones --- | ||
| to_remove = [] | ||
| for track_id, track in self._tracks.items(): | ||
| if track_id not in matched_track_ids: | ||
| track.status = TrackingStatus.LOST | ||
| if now - track.last_seen > self._max_lost_time: | ||
| to_remove.append(track_id) | ||
|
|
||
| for track_id in to_remove: | ||
| del self._tracks[track_id] | ||
|
|
||
| # --- convert to TrackedObject output --- | ||
| for track in self._tracks.values(): | ||
| results.append( | ||
| TrackedObject( | ||
| object_id=track.object_id, | ||
| status=track.status, | ||
| label=track.label, | ||
| confidence=track.confidence, | ||
| x=track.x, | ||
| y=track.y, | ||
| z=track.z, | ||
| bbox_x=int(track.xmin), | ||
| bbox_y=int(track.ymin), | ||
| bbox_width=int(track.xmax - track.xmin), | ||
| bbox_height=int(track.ymax - track.ymin), | ||
| ) | ||
| ) | ||
|
|
||
| return results | ||
|
|
||
| def _find_best_match(self, det: Detection) -> Optional[int]: | ||
| """Find the track with highest IoU above threshold.""" | ||
| best_id = None | ||
| best_iou = self._iou_threshold | ||
|
|
||
| for track_id, track in self._tracks.items(): | ||
| iou = self._compute_iou( | ||
| det.xmin, det.ymin, det.xmax, det.ymax, | ||
| track.xmin, track.ymin, track.xmax, track.ymax, | ||
| ) | ||
| if iou > best_iou: | ||
| best_iou = iou | ||
| best_id = track_id | ||
|
|
||
| return best_id | ||
|
|
||
| def _update_track( | ||
| self, | ||
| track: _TrackedState, | ||
| det: Detection, | ||
| now: float, | ||
| ) -> None: | ||
| """Update track with new detection, applying EMA smoothing.""" | ||
| a = self._alpha | ||
|
|
||
| # smooth spatial coords | ||
| track.x = a * det.x + (1 - a) * track.x | ||
| track.y = a * det.y + (1 - a) * track.y | ||
| track.z = a * det.z + (1 - a) * track.z | ||
|
|
||
| # smooth bbox | ||
| track.xmin = a * det.xmin + (1 - a) * track.xmin | ||
| track.ymin = a * det.ymin + (1 - a) * track.ymin | ||
| track.xmax = a * det.xmax + (1 - a) * track.xmax | ||
| track.ymax = a * det.ymax + (1 - a) * track.ymax | ||
|
|
||
| # update metadata | ||
| track.label = det.label | ||
| track.confidence = det.confidence | ||
| track.last_seen = now | ||
| track.frames_tracked += 1 | ||
| track.status = TrackingStatus.TRACKED | ||
|
|
||
| @staticmethod | ||
| def _compute_iou( | ||
| x1min: float, y1min: float, x1max: float, y1max: float, | ||
| x2min: float, y2min: float, x2max: float, y2max: float, | ||
| ) -> float: | ||
| """Compute Intersection over Union of two bounding boxes.""" | ||
| xi_min = max(x1min, x2min) | ||
| yi_min = max(y1min, y2min) | ||
| xi_max = min(x1max, x2max) | ||
| yi_max = min(y1max, y2max) | ||
|
|
||
| if xi_max <= xi_min or yi_max <= yi_min: | ||
| return 0.0 | ||
|
|
||
| inter_area = (xi_max - xi_min) * (yi_max - yi_min) | ||
| box1_area = (x1max - x1min) * (y1max - y1min) | ||
| box2_area = (x2max - x2min) * (y2max - y2min) | ||
| union_area = box1_area + box2_area - inter_area | ||
|
|
||
| if union_area <= 0: | ||
| return 0.0 | ||
|
|
||
| return inter_area / union_area |
There was a problem hiding this comment.
The PR description indicates unit tests for SoftwareTracker are planned but not yet implemented. The test plan shows this as unchecked. Consider implementing the unit tests before merging to ensure the IoU tracking logic, ID persistence, and coordinate smoothing work correctly.
| pipeline: dai.Pipeline, | ||
| spatial_detection_network: dai.node.SpatialDetectionNetwork, | ||
| tracker_type: str = "SHORT_TERM_IMAGELESS", | ||
| labels_to_track: List[int] = None, |
There was a problem hiding this comment.
The parameter labels_to_track: List[int] = None should be typed as Optional[List[int]] = None for correctness. While Python allows this, type checkers like mypy will flag this as an error since None is not a valid List[int]. Import Optional from typing and update the type hint.
| frame_height: int, | ||
| output_queue, # multiprocessing.Queue[List[TrackedObject]] | ||
| tracker_type: str = "SHORT_TERM_IMAGELESS", | ||
| labels_to_track: List[int] = None, |
There was a problem hiding this comment.
The parameter labels_to_track: List[int] = None should be typed as Optional[List[int]] = None for correctness. While Python allows this, type checkers like mypy will flag this as an error since None is not a valid List[int]. Import Optional from typing and update the type hint.
| max_lost_frames: Seconds before a lost object is removed. | ||
| smoothing_alpha: EMA alpha for coordinate smoothing (0-1). | ||
| Higher = more weight on new detections. | ||
| """ |
There was a problem hiding this comment.
The smoothing_alpha parameter should be validated to ensure it's in the range [0, 1]. Values outside this range will produce unexpected smoothing behavior. Consider adding a check: if not 0 <= smoothing_alpha <= 1: raise ValueError("smoothing_alpha must be between 0 and 1").
| """ | |
| """ | |
| if not 0 <= smoothing_alpha <= 1: | |
| raise ValueError("smoothing_alpha must be between 0 and 1") |
| max_lost_frames: Seconds before a lost object is removed. | ||
| smoothing_alpha: EMA alpha for coordinate smoothing (0-1). | ||
| Higher = more weight on new detections. | ||
| """ |
There was a problem hiding this comment.
The iou_threshold parameter should be validated to ensure it's in a reasonable range [0, 1]. IoU values are always between 0 and 1, so a threshold outside this range would be meaningless. Consider adding validation: if not 0 <= iou_threshold <= 1: raise ValueError("iou_threshold must be between 0 and 1").
| """ | |
| """ | |
| if not 0.0 <= iou_threshold <= 1.0: | |
| raise ValueError("iou_threshold must be between 0 and 1") |
| max_lost_frames: Seconds before a lost object is removed. | ||
| smoothing_alpha: EMA alpha for coordinate smoothing (0-1). | ||
| Higher = more weight on new detections. | ||
| """ |
There was a problem hiding this comment.
The max_lost_frames (which represents seconds) should be validated to ensure it's non-negative. A negative value would cause tracks to be immediately removed. Consider adding validation: if max_lost_frames < 0: raise ValueError("max_lost_frames must be non-negative").
| """ | |
| """ | |
| if max_lost_frames < 0: | |
| raise ValueError("max_lost_frames must be non-negative") |
|
|
||
| import time | ||
| from typing import List, Dict, Optional | ||
| from dataclasses import dataclass, field |
There was a problem hiding this comment.
Import of 'field' is not used.
| from dataclasses import dataclass, field | |
| from dataclasses import dataclass |
Summary
Test plan