From 005c3c0129caa28e7512656daf34211612e6c039 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 12 Dec 2025 15:38:39 +0000 Subject: [PATCH 1/2] Unsubscribe the callback that unsubscribes the tiled writer When the tiled writer subscribes to the run_engine, another callback subscribes to the task worker to remove the writer at the end of the plan/task. This subscriber should also be removed at the end of the task. To avoid creating a third subscriber and the same issue again, rely on the unsubscribe handle created after the callback is defined being available in the closure when it runs. There is a very slight race condition here if the task complete event is received before the subscribe method returns but until it happens we'll assume it's unlikely. --- src/blueapi/service/interface.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 74a95291c..60b93250e 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -173,17 +173,20 @@ def begin_task( task: WorkerTask, pass_through_headers: Mapping[str, str] | None = None ) -> WorkerTask: """Trigger a task. Will fail if the worker is busy""" - if nt := context().numtracker: + + active_worker = worker() + active_context = context() + if nt := active_context.numtracker: nt.set_headers(pass_through_headers or {}) - if tiled_config := context().tiled_conf: + if tiled_config := active_context.tiled_conf: # Tiled queries the root node, so must create an authorized client tiled_client = from_uri( str(tiled_config.url), api_key=tiled_config.api_key, headers=pass_through_headers, ) - tiled_writer_token = context().run_engine.subscribe( + tiled_writer_token = active_context.run_engine.subscribe( TiledWriter(tiled_client, batch_size=1) ) @@ -195,12 +198,15 @@ def remove_callback_when_task_finished( and event.task_status.task_id == task.task_id and event.task_status.task_complete ): - context().run_engine.unsubscribe(tiled_writer_token) + active_context.run_engine.unsubscribe(tiled_writer_token) + active_worker.worker_events.unsubscribe(remove_callback) - worker().worker_events.subscribe(remove_callback_when_task_finished) + remove_callback = active_worker.worker_events.subscribe( + remove_callback_when_task_finished + ) if task.task_id is not None: - worker().begin_task(task.task_id) + active_worker.begin_task(task.task_id) return task From f285529b4e69c7908f0af91d56695c3b41f5431e Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 12 Dec 2025 17:47:01 +0000 Subject: [PATCH 2/2] Test removal of tiled subscriber --- tests/unit_tests/service/test_interface.py | 51 +++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_interface.py b/tests/unit_tests/service/test_interface.py index 565ad1c39..e3cda0b5a 100644 --- a/tests/unit_tests/service/test_interface.py +++ b/tests/unit_tests/service/test_interface.py @@ -27,6 +27,7 @@ PlanSource, ScratchConfig, StompConfig, + TiledConfig, ) from blueapi.core.context import BlueskyContext from blueapi.service import interface @@ -42,7 +43,7 @@ ) from blueapi.utils.invalid_config_error import InvalidConfigError from blueapi.utils.path_provider import StartDocumentPathProvider -from blueapi.worker.event import TaskStatusEnum, WorkerState +from blueapi.worker.event import TaskStatus, TaskStatusEnum, WorkerEvent, WorkerState from blueapi.worker.task import Task from blueapi.worker.task_worker import TrackableTask @@ -365,6 +366,54 @@ def test_get_task_by_id( ) +@patch("blueapi.service.interface.TiledWriter") +@patch("blueapi.service.interface.from_uri") +@patch("blueapi.service.interface.context") +@patch("blueapi.service.interface.worker") +def test_remove_tiled_subscriber(worker, context, from_uri, writer): + task = WorkerTask(task_id="foo_bar") + context().numtracker = None + context().tiled_conf = TiledConfig() + context().run_engine.subscribe.return_value = 17 + worker().worker_events.subscribe.return_value = 42 + + interface.begin_task(task) + + writer.assert_called_once_with(from_uri(), batch_size=1) + context().run_engine.subscribe.assert_called_once_with(writer()) + worker().worker_events.subscribe.assert_called_once() + + inner_callback = worker().worker_events.subscribe.call_args.args[0] + + inner_callback( + WorkerEvent( + state=WorkerState.RUNNING, + task_status=TaskStatus( + task_id="foo_bar", + task_complete=False, + task_failed=False, + ), + ), + "c_id", + ) + context().run_engine.unsubscribe.assert_not_called() + worker().worker_events.unsubscribe.assert_not_called() + + inner_callback( + WorkerEvent( + state=WorkerState.IDLE, + task_status=TaskStatus( + task_id="foo_bar", + task_complete=True, + task_failed=False, + ), + ), + "c_id", + ) + context().run_engine.unsubscribe.assert_called_once_with(17) + worker().worker_events.unsubscribe.assert_called_once_with(42) + + def test_get_oidc_config(oidc_config: OIDCConfig): interface.set_config(ApplicationConfig(oidc=oidc_config)) assert interface.get_oidc_config() == oidc_config