diff --git a/src/blueapi/core/event.py b/src/blueapi/core/event.py index fff9c833d..f6103cd0d 100644 --- a/src/blueapi/core/event.py +++ b/src/blueapi/core/event.py @@ -1,4 +1,5 @@ import itertools +import logging from abc import ABC, abstractmethod from collections.abc import Callable from typing import Generic, TypeVar @@ -9,6 +10,8 @@ #: Subscription token type S = TypeVar("S") +LOGGER = logging.getLogger(__name__) + class EventStream(ABC, Generic[E, S]): """ @@ -77,4 +80,10 @@ def publish(self, event: E, correlation_id: str | None = None) -> None: """ for callback in list(self._subscriptions.values()): - callback(event, correlation_id) + try: + callback(event, correlation_id) + except Exception as e: + LOGGER.error( + f"Failed to send event {event} with {correlation_id=}", + exc_info=e, + ) diff --git a/tests/unit_tests/core/test_event.py b/tests/unit_tests/core/test_event.py index 4f39ebc11..24ccb5c85 100644 --- a/tests/unit_tests/core/test_event.py +++ b/tests/unit_tests/core/test_event.py @@ -2,6 +2,7 @@ from concurrent.futures import Future from dataclasses import dataclass from queue import Queue +from unittest import mock import pytest @@ -76,6 +77,21 @@ def test_correlation_id(publisher: EventPublisher[MyEvent]) -> None: assert f.result(timeout=_TIMEOUT) == correlation_id +def test_callback_exceptions_are_contained(publisher: EventPublisher[MyEvent]): + event = MyEvent("foo") + c_id = "bar" + + # First call should raise exception, next should be fine + handler = mock.Mock(side_effect=[ValueError("Bad Event"), ()]) + publisher.subscribe(handler) + publisher.subscribe(handler) + + publisher.publish(event, c_id) + + # Both handlers should be called and the exception should not be raised from publish + handler.assert_has_calls([mock.call(event, c_id), mock.call(event, c_id)]) + + def _drain(queue: Queue) -> Iterable: while not queue.empty(): yield queue.get_nowait()