diff --git a/src/blueapi/core/context.py b/src/blueapi/core/context.py index 31764b286..209daaca2 100644 --- a/src/blueapi/core/context.py +++ b/src/blueapi/core/context.py @@ -123,7 +123,8 @@ def __post_init__(self, configuration: ApplicationConfig | None): path_provider = StartDocumentPathProvider() set_path_provider(path_provider) - self.run_engine.subscribe(path_provider.update_run, "start") + self.run_engine.subscribe(path_provider.run_start, "start") + self.run_engine.subscribe(path_provider.run_stop, "stop") def _update_scan_num(md: dict[str, Any]) -> int: scan = numtracker.create_scan( diff --git a/src/blueapi/utils/path_provider.py b/src/blueapi/utils/path_provider.py index e504325b5..902eed1d5 100644 --- a/src/blueapi/utils/path_provider.py +++ b/src/blueapi/utils/path_provider.py @@ -1,6 +1,6 @@ from pathlib import Path -from event_model import RunStart +from event_model.basemodels import RunStart, RunStop from ophyd_async.core import PathInfo, PathProvider DEFAULT_TEMPLATE = "{device_name}-{instrument}-{scan_id}" @@ -17,15 +17,17 @@ class StartDocumentPathProvider(PathProvider): """ def __init__(self) -> None: - self._doc = {} + self._doc: RunStart | None = None - def update_run(self, name: str, start_doc: RunStart) -> None: - """Cache a start document. - - This can be plugged into the run engine's subscribe method. - """ + def run_start(self, name: str, start_document: RunStart) -> None: if name == "start": - self._doc = start_doc + if self._doc is None: + self._doc = start_document + + def run_stop(self, name: str, stop_document: RunStop) -> None: + if name == "stop": + if self._doc is not None and stop_document.run_start == self._doc.uid: + self._doc = None def __call__(self, device_name: str | None = None) -> PathInfo: """Returns the directory path and filename for a given data_session. @@ -36,7 +38,14 @@ def __call__(self, device_name: str | None = None) -> PathInfo: If you do not provide a data_session_directory it will default to "/tmp". """ - template = self._doc.get("data_file_path_template", DEFAULT_TEMPLATE) - sub_path = template.format_map(self._doc | {"device_name": device_name}) - data_session_directory = Path(self._doc.get("data_session_directory", "/tmp")) - return PathInfo(directory_path=data_session_directory, filename=sub_path) + if self._doc is None: + raise AttributeError( + "Start document not found. This call must be made inside a run." + ) + else: + doc = self._doc.model_dump() + template = doc.get("data_file_path_template", DEFAULT_TEMPLATE) + data_session_directory = Path(doc.get("data_session_directory", "/tmp")) + filename = template.format_map(doc | {"device_name": device_name}) + + return PathInfo(directory_path=data_session_directory, filename=filename) diff --git a/tests/unit_tests/utils/test_path_provider.py b/tests/unit_tests/utils/test_path_provider.py index 80e4a95d4..bcec86e9a 100644 --- a/tests/unit_tests/utils/test_path_provider.py +++ b/tests/unit_tests/utils/test_path_provider.py @@ -1,44 +1,46 @@ from pathlib import PosixPath import pytest -from event_model.documents import RunStart +from event_model.basemodels import RunStart, RunStop from ophyd_async.core import PathInfo from blueapi.utils.path_provider import StartDocumentPathProvider @pytest.fixture -def start_doc_default_template() -> dict: - return { - "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", - "time": 1741264729.96875, - "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, - "data_session": "ab123", - "instrument": "p01", - "data_session_directory": "/p01/ab123", - "scan_id": 22, - "plan_type": "generator", - "plan_name": "count", - "detectors": ["det"], - "num_points": 1, - "num_intervals": 0, - "plan_args": { - "detectors": [ - "" # NOQA: E501 - ], - "num": 1, - "delay": 0.0, - }, - "hints": {"dimensions": [[["time"], "primary"]]}, - "shape": [1], - } +def start_doc_default_template() -> RunStart: + return RunStart.model_validate( + { + "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264729.96875, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "instrument": "p01", + "data_session_directory": "/p01/ab123", + "scan_id": 22, + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) def test_start_document_path_provider_with_default_template_returns_correct_path_info( start_doc_default_template: RunStart, ): pp = StartDocumentPathProvider() - pp.update_run(name="start", start_doc=start_doc_default_template) + pp.run_start(name="start", start_document=start_doc_default_template) path = pp("det") assert path == PathInfo( @@ -49,38 +51,40 @@ def test_start_document_path_provider_with_default_template_returns_correct_path @pytest.fixture -def start_doc_custom_template() -> dict: - return { - "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", - "time": 1741264729.96875, - "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, - "data_session": "ab123", - "instrument": "p01", - "data_session_directory": "/p01/ab123", - "scan_id": 22, - "data_file_path_template": "{device_name}-{instrument}-{scan_id}-custom", - "plan_type": "generator", - "plan_name": "count", - "detectors": ["det"], - "num_points": 1, - "num_intervals": 0, - "plan_args": { - "detectors": [ - "" # NOQA: E501 - ], - "num": 1, - "delay": 0.0, - }, - "hints": {"dimensions": [[["time"], "primary"]]}, - "shape": [1], - } +def start_doc_custom_template() -> RunStart: + return RunStart.model_validate( + { + "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264729.96875, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "instrument": "p01", + "data_session_directory": "/p01/ab123", + "scan_id": 22, + "data_file_path_template": "{device_name}-{instrument}-{scan_id}-custom", + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) def test_start_document_path_provider_with_custom_template_returns_correct_path_info( start_doc_custom_template: RunStart, ): pp = StartDocumentPathProvider() - pp.update_run(name="start", start_doc=start_doc_custom_template) + pp.run_start(name="start", start_document=start_doc_custom_template) path = pp("det") assert path == PathInfo( @@ -91,108 +95,116 @@ def test_start_document_path_provider_with_custom_template_returns_correct_path_ @pytest.fixture -def start_doc_missing_instrument() -> dict: - return { - "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", - "time": 1741264729.96875, - "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, - "data_session": "ab123", - "data_session_directory": "/p01/ab123", - "scan_id": 22, - "plan_type": "generator", - "plan_name": "count", - "detectors": ["det"], - "num_points": 1, - "num_intervals": 0, - "plan_args": { - "detectors": [ - "" # NOQA: E501 - ], - "num": 1, - "delay": 0.0, - }, - "hints": {"dimensions": [[["time"], "primary"]]}, - "shape": [1], - } +def start_doc_missing_instrument() -> RunStart: + return RunStart.model_validate( + { + "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264729.96875, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "data_session_directory": "/p01/ab123", + "scan_id": 22, + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) def test_start_document_path_provider_fails_with_missing_instrument( start_doc_missing_instrument: RunStart, ): pp = StartDocumentPathProvider() - pp.update_run(name="start", start_doc=start_doc_missing_instrument) + pp.run_start(name="start", start_document=start_doc_missing_instrument) with pytest.raises(KeyError, match="'instrument'"): pp("det") @pytest.fixture -def start_doc_missing_scan_id() -> dict: - return { - "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", - "time": 1741264729.96875, - "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, - "data_session": "ab123", - "instrument": "p01", - "data_session_directory": "/p01/ab123", - "plan_type": "generator", - "plan_name": "count", - "detectors": ["det"], - "num_points": 1, - "num_intervals": 0, - "plan_args": { - "detectors": [ - "" # NOQA: E501 - ], - "num": 1, - "delay": 0.0, - }, - "hints": {"dimensions": [[["time"], "primary"]]}, - "shape": [1], - } - - -def test_start_document_path_provider_fails_with_missing_scan_id( +def start_doc_missing_scan_id() -> RunStart: + return RunStart.model_validate( + { + "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264729.96875, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "instrument": "p01", + "data_session_directory": "/p01/ab123", + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) + + +def test_start_document_path_provider_missing_scan_id_defaults_to_0( start_doc_missing_scan_id: RunStart, ): pp = StartDocumentPathProvider() - pp.update_run(name="start", start_doc=start_doc_missing_scan_id) + pp.run_start(name="start", start_document=start_doc_missing_scan_id) + path = pp("det") - with pytest.raises(KeyError, match="'scan_id'"): - pp("det") + assert path == PathInfo( + directory_path=PosixPath("/p01/ab123"), filename="det-p01-0", create_dir_depth=0 + ) @pytest.fixture -def start_doc_default_data_session_directory() -> dict: - return { - "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", - "time": 1741264729.96875, - "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, - "data_session": "ab123", - "instrument": "p01", - "scan_id": 22, - "plan_type": "generator", - "plan_name": "count", - "detectors": ["det"], - "num_points": 1, - "num_intervals": 0, - "plan_args": { - "detectors": [ - "" # NOQA: E501 - ], - "num": 1, - "delay": 0.0, - }, - "hints": {"dimensions": [[["time"], "primary"]]}, - "shape": [1], - } +def start_doc_default_data_session_directory() -> RunStart: + return RunStart.model_validate( + { + "uid": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264729.96875, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "instrument": "p01", + "scan_id": 22, + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) def test_start_document_path_provider_sets_data_session_directory_default_to_tmp( start_doc_default_data_session_directory: RunStart, ): pp = StartDocumentPathProvider() - pp.update_run(name="start", start_doc=start_doc_default_data_session_directory) + pp.run_start(name="start", start_document=start_doc_default_data_session_directory) path = pp("det") assert path == PathInfo( @@ -200,10 +212,144 @@ def test_start_document_path_provider_sets_data_session_directory_default_to_tmp ) -def test_start_document_path_provider_update_called_with_different_document_skips( +@pytest.fixture +def stop_doc_default_template() -> RunStop: + return RunStop.model_validate( + { + "run_start": "27c48d2f-d8c6-4ac0-8146-fedf467ce11f", + "time": 1741264732.96875, + "uid": "401ad197-5456-4a7d-ba5b-9cf8ad38d914", + "exit_status": "success", + "reason": "", + } + ) + + +def test_start_document_path_provider_run_start_called_with_different_document_skips( + stop_doc_default_template: RunStop, +): + pp = StartDocumentPathProvider() + pp.run_start(name="stop", start_document=stop_doc_default_template) # type: ignore + + assert pp._doc is None + + +def test_start_document_path_provider_run_stop_called_with_different_document_skips( + start_doc_default_template: RunStart, +): + pp = StartDocumentPathProvider() + pp.run_stop(name="start", stop_document=start_doc_default_template) # type: ignore + + assert pp._doc is None + + +@pytest.fixture +def start_doc() -> RunStart: + return RunStart.model_validate( + { + "uid": "fa2feced-4098-4c0e-869d-285d2a69c24a", + "time": 1690463918.3893268, + "versions": {"ophyd": "1.10.0", "bluesky": "1.13"}, + "data_session": "ab123", + "instrument": "p02", + "data_session_directory": "/p02/ab123", + "scan_id": 50, + "plan_type": "generator", + "plan_name": "count", + "detectors": ["det"], + "num_points": 1, + "num_intervals": 0, + "plan_args": { + "detectors": [ + "" # NOQA: E501 + ], + "num": 1, + "delay": 0.0, + }, + "hints": {"dimensions": [[["time"], "primary"]]}, + "shape": [1], + } + ) + + +@pytest.fixture +def stop_doc() -> RunStop: + return RunStop.model_validate( + { + "run_start": "fa2feced-4098-4c0e-869d-285d2a69c24a", + "time": 1690463920.3893268, + "uid": "401ad197-5456-4a7d-ba5b-9cf8ad38d914", + "exit_status": "success", + "reason": "", + "num_events": {"primary": 1}, + } + ) + + +def test_start_document_path_provider_start_doc_persists_until_stop_with_matching_id( + start_doc: RunStart, + stop_doc: RunStop, + start_doc_default_template: RunStart, + stop_doc_default_template: RunStop, +): + pp = StartDocumentPathProvider() + pp.run_start(name="start", start_document=start_doc) + + assert isinstance(pp._doc, RunStart) + assert pp._doc == start_doc + assert pp._doc.uid == "fa2feced-4098-4c0e-869d-285d2a69c24a" + + pp.run_start(name="start", start_document=start_doc_default_template) + assert pp._doc == start_doc + assert pp._doc.uid == "fa2feced-4098-4c0e-869d-285d2a69c24a" + + pp.run_stop(name="stop", stop_document=stop_doc_default_template) + assert pp._doc == start_doc + assert pp._doc.uid == "fa2feced-4098-4c0e-869d-285d2a69c24a" + + pp.run_stop(name="stop", stop_document=stop_doc) + assert pp._doc is None + + +def test_start_document_path_provider_nested_runs_use_parent_run_info( start_doc_default_template: RunStart, + stop_doc_default_template: RunStop, + start_doc: RunStart, + stop_doc: RunStop, ): pp = StartDocumentPathProvider() - pp.update_run(name="descriptor", start_doc=start_doc_default_template) + pp.run_start(name="start", start_document=start_doc_default_template) + parent_path_info = pp("det") + + assert isinstance(pp._doc, RunStart) + assert pp._doc == start_doc_default_template + assert pp._doc.uid == "27c48d2f-d8c6-4ac0-8146-fedf467ce11f" + assert parent_path_info == PathInfo( + directory_path=PosixPath("/p01/ab123"), + filename="det-p01-22", + create_dir_depth=0, + ) + + pp.run_start(name="start", start_document=start_doc) + assert pp._doc == start_doc_default_template + assert pp._doc.uid == "27c48d2f-d8c6-4ac0-8146-fedf467ce11f" - assert pp._doc == {} + assert pp("det") == parent_path_info + + pp.run_stop(name="stop", stop_document=stop_doc) + assert pp._doc == start_doc_default_template + assert pp._doc.uid == "27c48d2f-d8c6-4ac0-8146-fedf467ce11f" + + assert pp("det") == parent_path_info + + pp.run_stop(name="stop", stop_document=stop_doc_default_template) + assert pp._doc is None + + +def test_start_document_path_provider_called_without_start_raises(): + pp = StartDocumentPathProvider() + with pytest.raises( + AttributeError, + match="Start document not found. This call must be made inside a run.", + ): + pp("det")