From c1425d7663e90c7c1352a91196e02fdc31873144 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 13:45:46 -0800 Subject: [PATCH 01/23] Adding placeholders for dichroism beamline globus endpoints --- config.yml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/config.yml b/config.yml index 3f26a4f..0a69522 100644 --- a/config.yml +++ b/config.yml @@ -1,6 +1,32 @@ globus: globus_endpoints: + # Dichroism (4.0.2 and 6.3.1) ENDPOINTS + + bl402-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: TBD + name: bl402-compute-dtn + + bl402-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/4.0.2/raw + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: bl402-nersc_alsdev_raw + + bl631-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: 23af478e-d459-4e78-9753-5091b5fb432a + name: bl631-compute-dtn + + bl631-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/6.3.1/raw + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: bl631-nersc_alsdev_raw + # 7.0.1.2 ENDPOINTS nersc7012: @@ -46,6 +72,7 @@ globus: uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data + # 8.3.2 ENDPOINTS spot832: From 935da1b44eb859518d13d3192edad7ffa0907778 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 13:48:32 -0800 Subject: [PATCH 02/23] adding __init__.py --- orchestration/flows/dichroism/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 orchestration/flows/dichroism/__init__.py diff --git a/orchestration/flows/dichroism/__init__.py b/orchestration/flows/dichroism/__init__.py new file mode 100644 index 0000000..e69de29 From a36aadcd83734359d2ece86d218d1577d4f2375c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 13:49:06 -0800 Subject: [PATCH 03/23] Adding config.py for ConfigDichroism, which get the endpoints for 4.0.2 and 6.3.1 --- orchestration/flows/dichroism/config.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 orchestration/flows/dichroism/config.py diff --git a/orchestration/flows/dichroism/config.py b/orchestration/flows/dichroism/config.py new file mode 100644 index 0000000..643943c --- /dev/null +++ b/orchestration/flows/dichroism/config.py @@ -0,0 +1,15 @@ +from globus_sdk import TransferClient +from orchestration.globus import transfer + + +# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) +class ConfigDichroism: + def __init__(self) -> None: + config = transfer.get_config() + self.endpoints = transfer.build_endpoints(config) + self.apps = transfer.build_apps(config) + self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) + self.bl402_compute_dtn = self.endpoints["bl402-compute-dtn"] + self.bl402_nersc_alsdev_raw = self.endpoints["bl402-nersc_alsdev_raw"] + self.bl631_compute_dtn = self.endpoints["bl631-compute-dtn"] + self.bl631_nersc_alsdev_raw = self.endpoints["bl631-nersc_alsdev_raw"] From 7934538d588f4638fbedd026394b24a084cacb64 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 13:49:49 -0800 Subject: [PATCH 04/23] Adding move.py which has separate move tasks/flows for 4.0.2 and 6.3.1 --- orchestration/flows/dichroism/move.py | 315 ++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 orchestration/flows/dichroism/move.py diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py new file mode 100644 index 0000000..f35450f --- /dev/null +++ b/orchestration/flows/dichroism/move.py @@ -0,0 +1,315 @@ +import datetime +import logging +from typing import Optional + +from prefect import flow, task +# from prefect.blocks.system import JSON + +from orchestration.flows.dichroism.config import ConfigDichroism +from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe +from orchestration.prefect import schedule_prefect_flow +from orchestration.transfer_controller import CopyMethod, get_transfer_controller + +logger = logging.getLogger(__name__) + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +def prune( + file_path: str = None, + source_endpoint: GlobusEndpoint = None, + check_endpoint: Optional[GlobusEndpoint] = None, + days_from_now: float = 0.0, + config: ConfigDichroism = None +) -> bool: + """ + Prune (delete) data from a globus endpoint. + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + Args: + file_path (str): The path to the file or directory to prune + source_endpoint (GlobusEndpoint): The globus endpoint containing the data + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + days_from_now (float): Delay before pruning; if 0, prune immediately + Returns: + bool: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if not config: + config = ConfigDichroism() + + if days_from_now < 0: + raise ValueError(f"Invalid days_from_now: {days_from_now}") + + # JSON blocks are deprecated, we should use what they recommend in the docs + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + delay: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if delay.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + return _prune_globus_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config + ) + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {delay.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_globus_endpoint/prune_globus_endpoint", + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": config + }, + duration_from_now=delay, + ) + logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +# @staticmethod +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}") +def _prune_globus_endpoint( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: Optional[GlobusEndpoint] = None, + config: ConfigDichroism = None +) -> None: + """ + Prefect flow that performs the actual Globus endpoint pruning operation. + Args: + relative_path (str): The path of the file or directory to prune + source_endpoint (GlobusEndpoint): The Globus endpoint to prune from + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + config (BeamlineConfig): Configuration object with transfer client + """ + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") + + if not config: + config = ConfigDichroism() + + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + max_wait_seconds = 600 + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Running flow: {flow_name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + prune_one_safe( + file=relative_path, + if_older_than_days=0, + transfer_client=config.tc, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + logger=logger, + max_wait_seconds=max_wait_seconds + ) + +# ---------------------------------------------- +# Flow and task to process new files at BL 4.0.2 +# ---------------------------------------------- + + +@flow(name="new_402_file_flow", flow_run_name="process_new-{file_path}") +def process_new_402_file_flow( + file_path: str, + config: Optional[ConfigDichroism] = None +) -> None: + process_new_402_file_task( + file_path=file_path, + config=config + ) + + +@task(name="new_402_file_task") +def process_new_402_file_task( + file_path: str, + config: Optional[ConfigDichroism] = None +) -> None: + """ + Flow to process a new file at BL 9.3.1 + 1. Copy the file from the data402 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data402. 6 months from now. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. + """ + + logger.info(f"Processing new 402 file: {file_path}") + + if not config: + config = ConfigDichroism() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + transfer_controller.copy( + file_path=file_path, + source=config.bl402_compute_dtn, + destination=config.bl402_nersc_alsdev_raw + ) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + # Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + # TODO: Determine scheduling days_from_now based on beamline needs + prune( + file_path=file_path, + source_endpoint=config.bl402_compute_dtn, + check_endpoint=config.bl402_nersc_alsdev_raw, + days_from_now=180.0 # determine appropriate value: currently 6 months + ) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + +@flow(name="move_402_flight_check", flow_run_name="move_402_flight_check-{file_path}") +def move_402_flight_check( + file_path: str = "test_directory/test.txt", +): + """Please keep your arms and legs inside the vehicle at all times.""" + logger.info("402 flight check: testing transfer from data402 to NERSC CFS") + + config = ConfigDichroism() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + success = transfer_controller.copy( + file_path=file_path, + source=config.data402_raw, + destination=config.nersc402_alsdev_raw + ) + if success is True: + logger.info("402 flight check: transfer successful") + else: + logger.error("402 flight check: transfer failed") + +# ---------------------------------------------- +# Flow and task to process new files at BL 6.3.1 +# ---------------------------------------------- + + +@flow(name="new_631_file_flow", flow_run_name="process_new-{file_path}") +def process_new_631_file_flow( + file_path: str, + config: Optional[ConfigDichroism] = None +) -> None: + process_new_631_file_task( + file_path=file_path, + config=config + ) + + +@task(name="new_631_file_task") +def process_new_631_file_task( + file_path: str, + config: Optional[ConfigDichroism] = None +) -> None: + """ + Flow to process a new file at BL 9.3.1 + 1. Copy the file from the data402 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data402. 6 months from now. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. + """ + + logger.info(f"Processing new 402 file: {file_path}") + + if not config: + config = ConfigDichroism() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + transfer_controller.copy( + file_path=file_path, + source=config.bl631_compute_dtn, + destination=config.bl631_nersc_alsdev_raw + ) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + # Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + # TODO: Determine scheduling days_from_now based on beamline needs + prune( + file_path=file_path, + source_endpoint=config.bl631_compute_dtn, + check_endpoint=config.bl631_nersc_alsdev_raw, + days_from_now=180.0 # determine appropriate value: currently 6 months + ) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + +@flow(name="move_631_flight_check", flow_run_name="move_631_flight_check-{file_path}") +def move_631_flight_check( + file_path: str = "test_directory/test.txt", +): + """Please keep your arms and legs inside the vehicle at all times.""" + logger.info("631 flight check: testing transfer from data631 to NERSC CFS") + + config = ConfigDichroism() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + success = transfer_controller.copy( + file_path=file_path, + source=config.data631_raw, + destination=config.nersc631_alsdev_raw + ) + if success is True: + logger.info("631 flight check: transfer successful") + else: + logger.error("631 flight check: transfer failed") From 7c4795146af96c264f3cfc73ada2448e84b9bb7a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 14:15:37 -0800 Subject: [PATCH 05/23] Adding prefect.yaml for dichroism beamlines --- orchestration/flows/dichroism/prefect.yaml | 56 ++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 orchestration/flows/dichroism/prefect.yaml diff --git a/orchestration/flows/dichroism/prefect.yaml b/orchestration/flows/dichroism/prefect.yaml new file mode 100644 index 0000000..448ce33 --- /dev/null +++ b/orchestration/flows/dichroism/prefect.yaml @@ -0,0 +1,56 @@ +name: dichroism +prefect-version: 3.4.2 +deployments: +- name: run_dichroism_dispatcher + entrypoint: orchestration/flows/dichroism/dispatcher.py:dispatcher + work_pool: + name: dispatcher_dichroism_pool + work_queue_name: dispatcher_dichroism_queue + +# 4.0.2 Flows +- name: new_file_402_flow + entrypoint: orchestration/flows/bl402/move.py:process_new_402_file_flow + work_pool: + name: new_file_402_pool + work_queue_name: new_file_402_queue + +- name: new_file_402_flight_check + entrypoint: orchestration/flows/bl402/move.py:move_402_flight_check + work_pool: + name: new_file_402_pool + work_queue_name: move_file_402_flight_check_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-402-flight-check" + timezone: America/Los_Angeles + active: true + +- name: prune_data402 + entrypoint: orchestration/flows/bl402/move.py:_prune_globus_endpoint + work_pool: + name: prune_402_pool + work_queue_name: prune_402_queue + +# 6.3.1 Flows +- name: new_file_631_flow + entrypoint: orchestration/flows/bl631/move.py:process_new_631_file_flow + work_pool: + name: new_file_631_pool + work_queue_name: new_file_631_queue + +- name: new_file_631_flight_check + entrypoint: orchestration/flows/bl631/move.py:move_631_flight_check + work_pool: + name: new_file_631_pool + work_queue_name: move_file_631_flight_check_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-631-flight-check" + timezone: America/Los_Angeles + active: true + +- name: prune_data631 + entrypoint: orchestration/flows/bl631/move.py:_prune_globus_endpoint + work_pool: + name: prune_631_pool + work_queue_name: prune_631_queue From e9ef4a330a5551b2125b77d70a15975afad30d08 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 14:21:38 -0800 Subject: [PATCH 06/23] Adding dispatcher for dichroism beamlines. Uses the same flow, but accepts either BL402 or BL631 parameters (Enum) to call the corresponding move task --- orchestration/flows/dichroism/dispatcher.py | 82 +++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 orchestration/flows/dichroism/dispatcher.py diff --git a/orchestration/flows/dichroism/dispatcher.py b/orchestration/flows/dichroism/dispatcher.py new file mode 100644 index 0000000..8706ed5 --- /dev/null +++ b/orchestration/flows/dichroism/dispatcher.py @@ -0,0 +1,82 @@ +import logging +from prefect import flow +from typing import Optional, Union, Any, Enum + +from orchestration.flows.dichroism.config import ConfigDichroism +from orchestration.flows.dichroism.move import process_new_402_file_task, process_new_631_file_task + +logger = logging.getLogger(__name__) + + +class DichroismBeamlineEnum(str, Enum): + BL402 = "BL402" + BL631 = "BL631" + + +# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config402 +@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +def dispatcher( + file_path: Optional[str] = None, + is_export_control: bool = False, + config: Optional[Union[dict, Any]] = None, + beamline: DichroismBeamlineEnum = None +) -> None: + """ + Dispatcher flow for BL402 beamline that launches the new_402_file_flow. + + :param file_path: Path to the file to be processed. + :param is_export_control: Flag indicating if export control measures should be applied. + (Not used in the current BL402 processing) + :param config: Configuration settings for processing. + Expected to be an instance of ConfigDichroism or a dict that can be converted. + :param beamline: Beamline identifier (must be either BL402, BL631). + :raises ValueError: If no configuration is provided. + :raises TypeError: If the provided configuration is not a dict or ConfigDichroism. + """ + + logger.info("Starting dispatcher flow for Dichroism.") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}", beamline=beamline) + + # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. + if file_path is None: + logger.error("No file_path provided to dispatcher.") + raise ValueError("File path is required for processing.") + + if is_export_control: + logger.error("Data is under export control. Processing is not allowed.") + raise ValueError("Data is under export control. Processing is not allowed.") + + if config is None: + logger.info("No config provided, initializing default ConfigDichroism.") + config = ConfigDichroism() + + if beamline is None: + logger.error("No beamline specified.") + raise ValueError("Beamline must be specified as either BL402 or BL631.") + + if beamline == DichroismBeamlineEnum.BL402: + logger.info("Dispatching to BL402 processing flow.") + try: + process_new_402_file_task( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully for BL402.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow for BL402: {e}") + raise + elif beamline == DichroismBeamlineEnum.BL631: + logger.info("Dispatching to BL631 processing flow.") + try: + process_new_631_file_task( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully for BL631.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow for BL631: {e}") + raise + else: + logger.error(f"Invalid beamline specified: {beamline}") + raise ValueError(f"Invalid beamline specified: {beamline}. Must be either BL402 or BL631.") + logger.info("Dispatcher flow finished.") From 2bddfaf37b66f4ebffd825a0eae08f9d7a46d569 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 25 Nov 2025 14:53:12 -0800 Subject: [PATCH 07/23] Adding pytests for dichroism and adjusting dispatcher/move flows/tasks. --- .../_tests/test_dichroism/__init__.py | 0 .../_tests/test_dichroism/test_move.py | 334 ++++++++++++++++++ orchestration/flows/dichroism/dispatcher.py | 5 +- orchestration/flows/dichroism/move.py | 11 +- 4 files changed, 342 insertions(+), 8 deletions(-) create mode 100644 orchestration/_tests/test_dichroism/__init__.py create mode 100644 orchestration/_tests/test_dichroism/test_move.py diff --git a/orchestration/_tests/test_dichroism/__init__.py b/orchestration/_tests/test_dichroism/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orchestration/_tests/test_dichroism/test_move.py b/orchestration/_tests/test_dichroism/test_move.py new file mode 100644 index 0000000..20ef940 --- /dev/null +++ b/orchestration/_tests/test_dichroism/test_move.py @@ -0,0 +1,334 @@ +'''Pytest unit tests for Dichroism move flow. ''' + +import logging +import pytest +from uuid import uuid4 + +from prefect.testing.utilities import prefect_test_harness +from prefect.blocks.system import Secret, JSON +from pytest_mock import MockFixture + +from orchestration._tests.test_transfer_controller import MockSecret + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """ + A pytest fixture that automatically sets up and tears down the Prefect test harness + for the entire test session. It creates and saves test secrets and configurations + required for Globus integration. + + Yields: + None + """ + with prefect_test_harness(): + globus_client_id = Secret(value=str(uuid4())) + globus_client_id.save(name="globus-client-id", overwrite=True) + + globus_client_secret = Secret(value=str(uuid4())) + globus_client_secret.save(name="globus-client-secret", overwrite=True) + + pruning_config = JSON(value={"max_wait_seconds": 600}) + pruning_config.save(name="pruning-config", overwrite=True) + + yield + + +# ---------------------------- +# Tests for 402 +# ---------------------------- + +def test_process_new_402_file_task(mocker: MockFixture) -> None: + """ + Test the process_new_402_file flow from orchestration.flows.dichroism.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.dichroism.move import process_new_402_file_task + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.dichroism.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.dichroism.move.schedule_prefect_flow", + return_value=None + ) + + # Instantiate the mock configuration. + from orchestration.flows.dichroism.config import ConfigDichroism + mock_config = ConfigDichroism() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + mock_prune = mocker.patch( + "orchestration.flows.dichroism.move.prune", + return_value=None + ) + + # Patch get_transfer_controller where it is used in process_new_931_file_task. + mocker.patch( + "orchestration.flows.dichroism.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the move flow with the test file path and mock configuration. + result = process_new_402_file_task(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + # Reset mocks and test with config=None + mock_transfer_controller.copy.reset_mock() + mock_prune.reset_mock() + + result = process_new_402_file_task(file_path=test_file_path, config=None) + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + +# ---------------------------- +# Tests for 631 +# ---------------------------- + +def test_process_new_631_file_task(mocker: MockFixture) -> None: + """ + Test the process_new_631_file flow from orchestration.flows.dichroism.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.dichroism.move import process_new_631_file_task + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.dichroism.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.dichroism.move.schedule_prefect_flow", + return_value=None + ) + + # Instantiate the dummy configuration. + from orchestration.flows.dichroism.config import ConfigDichroism + mock_config = ConfigDichroism() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + mock_prune = mocker.patch( + "orchestration.flows.dichroism.move.prune", + return_value=None + ) + + # Patch get_transfer_controller where it is used in process_new_931_file_task. + mocker.patch( + "orchestration.flows.dichroism.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the move flow with the test file path and mock configuration. + result = process_new_631_file_task(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + # Reset mocks and test with config=None + mock_transfer_controller.copy.reset_mock() + mock_prune.reset_mock() + + result = process_new_631_file_task(file_path=test_file_path, config=None) + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + +def test_dispatcher_dichroism_flow(mocker: MockFixture) -> None: + """ + Test the dispatcher flow for Dichroism. + + This test verifies that: + - The process_new_402_file_task or process_new_631_file_task functions are called with the correct parameters + when the dispatcher flow is executed. + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the dispatcher flow to test. + from orchestration.flows.dichroism.dispatcher import dispatcher, DichroismBeamlineEnum + + # Create a mock configuration object. + class MockConfig: + pass + + mock_config = MockConfig() + + # Generate a test file path. + test_file = f"/tmp/test_file_{uuid4()}.txt" + + # ----------------------------- + # Common patches used by dispatcher + # ----------------------------- + mocker.patch("prefect.blocks.system.Secret.load", return_value=MockSecret()) + mocker.patch( + "orchestration.flows.dichroism.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() + ) + mocker.patch( + "orchestration.flows.dichroism.move.schedule_prefect_flow", + return_value=None + ) + + # --------------------------------- + # Patch BOTH processing tasks + # --------------------------------- + mock_402 = mocker.patch( + "orchestration.flows.dichroism.dispatcher.process_new_402_file_task", + return_value=None + ) + mock_631 = mocker.patch( + "orchestration.flows.dichroism.dispatcher.process_new_631_file_task", + return_value=None + ) + + # ---------------------------------------------------------------------- + # 402 TEST + # ---------------------------------------------------------------------- + dispatcher( + file_path=test_file, + is_export_control=False, + config=mock_config, + beamline="BL402" + ) + + mock_402.assert_called_once_with(file_path=test_file, config=mock_config) + mock_631.assert_not_called() + + # Reset mocks to reuse + mock_402.reset_mock() + mock_631.reset_mock() + + # ---------------------------------------------------------------------- + # 402 TEST – config=None should still call 402 + # ---------------------------------------------------------------------- + dispatcher( + file_path=test_file, + is_export_control=False, + config=None, + beamline="BL402" + ) + + mock_402.assert_called_once() + mock_631.assert_not_called() + + mock_402.reset_mock() + mock_631.reset_mock() + + # ---------------------------------------------------------------------- + # 631 TEST + # ---------------------------------------------------------------------- + dispatcher( + file_path=test_file, + is_export_control=False, + config=mock_config, + beamline="BL631" + ) + + mock_631.assert_called_once_with(file_path=test_file, config=mock_config) + mock_402.assert_not_called() + + mock_402.reset_mock() + mock_631.reset_mock() + + # ---------------------------------------------------------------------- + # 631 TEST – config=None + # ---------------------------------------------------------------------- + dispatcher( + file_path=test_file, + is_export_control=False, + config=None, + beamline="BL631" + ) + + mock_631.assert_called_once() + mock_402.assert_not_called() + + mock_402.reset_mock() + mock_631.reset_mock() + + # ---------------------------------------------------------------------- + # Missing file_path → ValueError + # ---------------------------------------------------------------------- + with pytest.raises(ValueError): + dispatcher( + file_path=None, + is_export_control=False, + config=mock_config, + beamline="BL402" + ) + + mock_402.assert_not_called() + mock_631.assert_not_called() + + # ---------------------------------------------------------------------- + # export control flag blocks execution + # ---------------------------------------------------------------------- + with pytest.raises(ValueError): + dispatcher( + file_path=test_file, + is_export_control=True, + config=mock_config, + beamline="BL402" + ) + + mock_402.assert_not_called() + mock_631.assert_not_called() + + # ---------------------------------------------------------------------- + # Missing beamline enum → ValueError + # ---------------------------------------------------------------------- + with pytest.raises(ValueError): + dispatcher( + file_path=test_file, + is_export_control=False, + config=mock_config, + beamline=None + ) + + mock_402.assert_not_called() + mock_631.assert_not_called() diff --git a/orchestration/flows/dichroism/dispatcher.py b/orchestration/flows/dichroism/dispatcher.py index 8706ed5..16d4943 100644 --- a/orchestration/flows/dichroism/dispatcher.py +++ b/orchestration/flows/dichroism/dispatcher.py @@ -1,6 +1,7 @@ +from enum import Enum import logging from prefect import flow -from typing import Optional, Union, Any, Enum +from typing import Optional, Union, Any from orchestration.flows.dichroism.config import ConfigDichroism from orchestration.flows.dichroism.move import process_new_402_file_task, process_new_631_file_task @@ -19,7 +20,7 @@ def dispatcher( file_path: Optional[str] = None, is_export_control: bool = False, config: Optional[Union[dict, Any]] = None, - beamline: DichroismBeamlineEnum = None + beamline: Optional[DichroismBeamlineEnum] = None ) -> None: """ Dispatcher flow for BL402 beamline that launches the new_402_file_flow. diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index f35450f..3ce51db 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -152,7 +152,7 @@ def process_new_402_file_task( config: Optional[ConfigDichroism] = None ) -> None: """ - Flow to process a new file at BL 9.3.1 + Flow to process a new file at BL 4.0.2 1. Copy the file from the data402 to NERSC CFS. Ingest file path in SciCat. 2. Schedule pruning from data402. 6 months from now. 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. @@ -244,9 +244,9 @@ def process_new_631_file_task( config: Optional[ConfigDichroism] = None ) -> None: """ - Flow to process a new file at BL 9.3.1 - 1. Copy the file from the data402 to NERSC CFS. Ingest file path in SciCat. - 2. Schedule pruning from data402. 6 months from now. + Flow to process a new file at BL 6.3.1 + 1. Copy the file from the data631 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data631. 6 months from now. 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. @@ -254,7 +254,7 @@ def process_new_631_file_task( :param config: Configuration settings for processing. """ - logger.info(f"Processing new 402 file: {file_path}") + logger.info(f"Processing new 631 file: {file_path}") if not config: config = ConfigDichroism() @@ -273,7 +273,6 @@ def process_new_631_file_task( # TODO: Ingest file path in SciCat # Waiting for PR #62 to be merged (scicat_controller) - # Schedule pruning from QNAP # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs prune( From 1bfd2b241a7d51452ed790547dbb7f0e4c4a35e2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 16:10:18 -0800 Subject: [PATCH 08/23] updating init_work_pools to handle when beamline id is a number (e.g. 733 -> bl733 is the folder name) or a word (dichroism -> dichroism is the folder name) --- init_work_pools.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/init_work_pools.py b/init_work_pools.py index de150b6..f65aa3e 100644 --- a/init_work_pools.py +++ b/init_work_pools.py @@ -49,10 +49,15 @@ def check_env() -> tuple[str, str, str]: """Validate required environment variables and paths.""" beamline = os.environ.get("BEAMLINE") if not beamline: - logger.error("Must set BEAMLINE (e.g., 832, 733)") + logger.error("Must set BEAMLINE (e.g., 832, 733, dichroism)") sys.exit(1) - prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + # Check if the beamline identifier is a number or a string to get the correct flows folder name + if beamline.isdigit(): + prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + else: + prefect_yaml = f"orchestration/flows/{beamline}/prefect.yaml" + if not os.path.isfile(prefect_yaml): logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!") sys.exit(1) From 5c4523b86a3dde3b2d9569220a4f85942af59a8e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 16:22:25 -0800 Subject: [PATCH 09/23] Fixing dichroism beamline folder name in prefect.yaml --- orchestration/flows/dichroism/prefect.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/orchestration/flows/dichroism/prefect.yaml b/orchestration/flows/dichroism/prefect.yaml index 448ce33..fc6146c 100644 --- a/orchestration/flows/dichroism/prefect.yaml +++ b/orchestration/flows/dichroism/prefect.yaml @@ -9,13 +9,13 @@ deployments: # 4.0.2 Flows - name: new_file_402_flow - entrypoint: orchestration/flows/bl402/move.py:process_new_402_file_flow + entrypoint: orchestration/flows/dichroism/move.py:process_new_402_file_flow work_pool: name: new_file_402_pool work_queue_name: new_file_402_queue - name: new_file_402_flight_check - entrypoint: orchestration/flows/bl402/move.py:move_402_flight_check + entrypoint: orchestration/flows/dichroism/move.py:move_402_flight_check work_pool: name: new_file_402_pool work_queue_name: move_file_402_flight_check_queue @@ -26,20 +26,20 @@ deployments: active: true - name: prune_data402 - entrypoint: orchestration/flows/bl402/move.py:_prune_globus_endpoint + entrypoint: orchestration/flows/dichroism/move.py:_prune_globus_endpoint work_pool: name: prune_402_pool work_queue_name: prune_402_queue # 6.3.1 Flows - name: new_file_631_flow - entrypoint: orchestration/flows/bl631/move.py:process_new_631_file_flow + entrypoint: orchestration/flows/dichroism/move.py:process_new_631_file_flow work_pool: name: new_file_631_pool work_queue_name: new_file_631_queue - name: new_file_631_flight_check - entrypoint: orchestration/flows/bl631/move.py:move_631_flight_check + entrypoint: orchestration/flows/dichroism/move.py:move_631_flight_check work_pool: name: new_file_631_pool work_queue_name: move_file_631_flight_check_queue @@ -50,7 +50,7 @@ deployments: active: true - name: prune_data631 - entrypoint: orchestration/flows/bl631/move.py:_prune_globus_endpoint + entrypoint: orchestration/flows/dichroism/move.py:_prune_globus_endpoint work_pool: name: prune_631_pool work_queue_name: prune_631_queue From 73293bc50e3d3850d55a43e6f18c40e35674f579 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 3 Dec 2025 16:26:16 -0800 Subject: [PATCH 10/23] Making 402/631 flows live on the same pools, but separate queues --- orchestration/flows/dichroism/prefect.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/orchestration/flows/dichroism/prefect.yaml b/orchestration/flows/dichroism/prefect.yaml index fc6146c..3325bfe 100644 --- a/orchestration/flows/dichroism/prefect.yaml +++ b/orchestration/flows/dichroism/prefect.yaml @@ -11,13 +11,13 @@ deployments: - name: new_file_402_flow entrypoint: orchestration/flows/dichroism/move.py:process_new_402_file_flow work_pool: - name: new_file_402_pool + name: new_file_dichroism_pool work_queue_name: new_file_402_queue - name: new_file_402_flight_check entrypoint: orchestration/flows/dichroism/move.py:move_402_flight_check work_pool: - name: new_file_402_pool + name: new_file_dichroism_pool work_queue_name: move_file_402_flight_check_queue schedules: - cron: "0 */12 * * *" # Every 12 hours @@ -28,20 +28,20 @@ deployments: - name: prune_data402 entrypoint: orchestration/flows/dichroism/move.py:_prune_globus_endpoint work_pool: - name: prune_402_pool + name: prune_dichroism_pool work_queue_name: prune_402_queue # 6.3.1 Flows - name: new_file_631_flow entrypoint: orchestration/flows/dichroism/move.py:process_new_631_file_flow work_pool: - name: new_file_631_pool + name: new_file_dichroism_pool work_queue_name: new_file_631_queue - name: new_file_631_flight_check entrypoint: orchestration/flows/dichroism/move.py:move_631_flight_check work_pool: - name: new_file_631_pool + name: new_file_dichroism_pool work_queue_name: move_file_631_flight_check_queue schedules: - cron: "0 */12 * * *" # Every 12 hours @@ -52,5 +52,5 @@ deployments: - name: prune_data631 entrypoint: orchestration/flows/dichroism/move.py:_prune_globus_endpoint work_pool: - name: prune_631_pool + name: prune_dichroism_pool work_queue_name: prune_631_queue From 533425fc945ca25670404eba9cb67b2e85309d56 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 12:04:16 -0800 Subject: [PATCH 11/23] Converting JSON Block to Variable Block --- .../_tests/test_dichroism/test_move.py | 23 +++++++++++++++---- orchestration/flows/dichroism/move.py | 22 ++++++++++-------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/orchestration/_tests/test_dichroism/test_move.py b/orchestration/_tests/test_dichroism/test_move.py index 20ef940..10090b3 100644 --- a/orchestration/_tests/test_dichroism/test_move.py +++ b/orchestration/_tests/test_dichroism/test_move.py @@ -5,7 +5,8 @@ from uuid import uuid4 from prefect.testing.utilities import prefect_test_harness -from prefect.blocks.system import Secret, JSON +from prefect.blocks.system import Secret +from prefect.variables import Variable from pytest_mock import MockFixture from orchestration._tests.test_transfer_controller import MockSecret @@ -31,8 +32,22 @@ def prefect_test_fixture(): globus_client_secret = Secret(value=str(uuid4())) globus_client_secret.save(name="globus-client-secret", overwrite=True) - pruning_config = JSON(value={"max_wait_seconds": 600}) - pruning_config.save(name="pruning-config", overwrite=True) + Variable.set( + name="globus-settings", + value={"max_wait_seconds": 600}, + overwrite=True, + _sync=True + ) + + Variable.set( + name="dichroism-settings", + value={ + "delete_data402_files_after_days": 180, + "delete_data631_files_after_days": 180 + }, + overwrite=True, + _sync=True + ) yield @@ -190,7 +205,7 @@ def test_dispatcher_dichroism_flow(mocker: MockFixture) -> None: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. """ # Import the dispatcher flow to test. - from orchestration.flows.dichroism.dispatcher import dispatcher, DichroismBeamlineEnum + from orchestration.flows.dichroism.dispatcher import dispatcher # Create a mock configuration object. class MockConfig: diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index 3ce51db..0b8de83 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -3,7 +3,7 @@ from typing import Optional from prefect import flow, task -# from prefect.blocks.system import JSON +from prefect.variables import Variable from orchestration.flows.dichroism.config import ConfigDichroism from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe @@ -49,10 +49,6 @@ def prune( if days_from_now < 0: raise ValueError(f"Invalid days_from_now: {days_from_now}") - # JSON blocks are deprecated, we should use what they recommend in the docs - # globus_settings = JSON.load("globus-settings").value - # max_wait_seconds = globus_settings["max_wait_seconds"] - logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") # convert float days → timedelta @@ -114,9 +110,9 @@ def _prune_globus_endpoint( if not config: config = ConfigDichroism() - # globus_settings = JSON.load("globus-settings").value - # max_wait_seconds = globus_settings["max_wait_seconds"] - max_wait_seconds = 600 + globus_settings = Variable.get("globus-settings") + max_wait_seconds = globus_settings["max_wait_seconds"] + flow_name = f"prune_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") @@ -184,11 +180,14 @@ def process_new_402_file_task( # Schedule pruning from QNAP # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs + + dichroism_settings = Variable.get("dichroism-settings") + prune( file_path=file_path, source_endpoint=config.bl402_compute_dtn, check_endpoint=config.bl402_nersc_alsdev_raw, - days_from_now=180.0 # determine appropriate value: currently 6 months + days_from_now=dichroism_settings["delete_data402_files_after_days"] # determine appropriate value: currently 6 months ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? @@ -275,11 +274,14 @@ def process_new_631_file_task( # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs + + dichroism_settings = Variable.get("dichroism-settings") + prune( file_path=file_path, source_endpoint=config.bl631_compute_dtn, check_endpoint=config.bl631_nersc_alsdev_raw, - days_from_now=180.0 # determine appropriate value: currently 6 months + days_from_now=dichroism_settings["delete_data631_files_after_days"] # determine appropriate value: currently 6 months ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? From 929b7b0f603745d62271bbc36291f952c6cc766b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 09:30:33 -0800 Subject: [PATCH 12/23] Fixing endpoint variable name typos --- orchestration/flows/dichroism/move.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index 0b8de83..aecf1e8 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -213,8 +213,8 @@ def move_402_flight_check( success = transfer_controller.copy( file_path=file_path, - source=config.data402_raw, - destination=config.nersc402_alsdev_raw + source=config.bl402_compute_dtn, + destination=config.bl402_nersc_alsdev_raw ) if success is True: logger.info("402 flight check: transfer successful") @@ -307,8 +307,8 @@ def move_631_flight_check( success = transfer_controller.copy( file_path=file_path, - source=config.data631_raw, - destination=config.nersc631_alsdev_raw + source=config.bl631_compute_dtn, + destination=config.bl631_nersc_alsdev_raw ) if success is True: logger.info("631 flight check: transfer successful") From 0aae72c50264d113c64b613d6713c57b2f92c998 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 10:00:49 -0800 Subject: [PATCH 13/23] Adding RuntimeError to flight checks in case the transfer fails, so the flow fails if the transfer fails. --- orchestration/flows/dichroism/move.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index aecf1e8..8d7f97c 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -220,6 +220,7 @@ def move_402_flight_check( logger.info("402 flight check: transfer successful") else: logger.error("402 flight check: transfer failed") + raise RuntimeError("402 flight check: transfer failed") # ---------------------------------------------- # Flow and task to process new files at BL 6.3.1 @@ -314,3 +315,4 @@ def move_631_flight_check( logger.info("631 flight check: transfer successful") else: logger.error("631 flight check: transfer failed") + raise RuntimeError("631 flight check: transfer failed") From e044c01fd44a2be427521282b949c23324868c5c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 11:38:20 -0800 Subject: [PATCH 14/23] Adding documentation for dichroism beamlines --- docs/mkdocs/docs/dichroism.md | 83 +++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 docs/mkdocs/docs/dichroism.md diff --git a/docs/mkdocs/docs/dichroism.md b/docs/mkdocs/docs/dichroism.md new file mode 100644 index 0000000..3b1ca74 --- /dev/null +++ b/docs/mkdocs/docs/dichroism.md @@ -0,0 +1,83 @@ +# Dichroism Beamline Flows +This page documents the workflows supported by Splash Flows for the ALS Dichroism beamlines: + +- [Beamline 4.0.2](https://als.lbl.gov/beamlines/4-0-2/) +- [Beamline 6.3.1](https://als.lbl.gov/beamlines/6-3-1/) + +**Data at Dichroism Beamlines** +These beamlines generate X-ray magnetic circular dichroism (XMCD) and X-ray magnetic linear dichroism (XMLD) spectroscopy data. + +## File Watcher +There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call dispatcher kicks off the downstream steps: + +Copy scans in real time from a Globus collection on the compute-dtn server to NERSC CFS using Globus Transfer. +Copy project data to NERSC HPSS for long-term storage (TBD). +Ingest into SciCat (TBD). +Schedule data pruning from compute-dtn and NERSC CFS. + +## Prefect Configuration + +### Registered Flows +#### dispatcher.py +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the dispatcher() Flow is called with either BL402 or BL631 as a parameter to specify the beamline. The dispatcher handles the synchronous call to the appropriate move task. +move.py +Contains separate move tasks/flows for each beamline: + +- process_new_402_file: Flow to process a new file at BL 4.0.2 +- process_new_631_file: Flow to process a new file at BL 6.3.1 + +Each flow performs the following steps: + +Copy the file from compute-dtn to NERSC CFS and ingest the file path and metadata into SciCat. +Schedule pruning from compute-dtn. +Copy the file from NERSC CFS to NERSC HPSS. Ingest the archived file path in SciCat. +Schedule pruning from NERSC CFS. + +### Work Pools and Queues +The following work pools are defined in orchestration/flows/dichroism/prefect.yaml: +DeploymentWork PoolWork Queuerun_dichroism_dispatcherdispatcher_dichroism_pooldispatcher_402_queue / dispatcher_631_queuenew_file_402new_file_dichroism_poolnew_file_402_queuenew_file_631new_file_dichroism_poolnew_file_631_queuetest_transfers_dichroismnew_file_dichroism_pooltest_transfers_dichroism_queue +Both beamlines share the same work pools but use separate queues for fine-grained control. +Configuration +Globus Endpoints +Endpoints are defined in config.yml: +yamldata402: + root_path: /path/to/compute-dtn/4.0.2/data + uri: compute-dtn.als.lbl.gov + uuid: + name: data402 + +nersc402: + root_path: /global/cfs/cdirs/als/data_mover/4.0.2 + uri: nersc.gov + uuid: + name: nersc402 + +data631: + root_path: /path/to/compute-dtn/6.3.1/data + uri: compute-dtn.als.lbl.gov + uuid: + name: data631 + +nersc631: + root_path: /global/cfs/cdirs/als/data_mover/6.3.1 + uri: nersc.gov + uuid: + name: nersc631 +Environment Variables +Required environment variables (set in .env or container environment): +bashGLOBUS_CLIENT_ID= +GLOBUS_CLIENT_SECRET= +PREFECT_API_URL= +PREFECT_API_KEY= +Deployment +Register Flows +Using the init script with Docker: +bashBEAMLINE=dichroism ./init_work_pools.py +Or deploy manually: +bashprefect deploy --prefect-file orchestration/flows/dichroism/prefect.yaml --all +Start Workers +bashprefect worker start --pool "dispatcher_dichroism_pool" +prefect worker start --pool "new_file_dichroism_pool" +VM Details +The computing backend runs on a VM managed by ALS IT staff. +flow-dichroism.als.lbl.govShareArtifactsDownload allDichroism readmeDocument · MD Project contentsplash_flowsCreated by youals-computing/splash_flowsmainGITHUB \ No newline at end of file From f45b9bb1072dea429277d3d9a4f2fc9fa17c4659 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 11:44:44 -0800 Subject: [PATCH 15/23] Adding dichroism.md to mkdocs.yml --- docs/mkdocs/mkdocs.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 05e6a2d..f0004dd 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -15,8 +15,10 @@ nav: - Getting Started: getting_started.md - Beamline Implementations: - 7.3.3 SAXS/WAXS/GISAXS: bl733.md - - 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md - - 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md + - Beamline 8.3.2 - Microtomography: + - Compute at ALCF: alcf832.md + - Compute at NERSC: nersc832.md + - Dichroism Beamlines (4.0.2 and 6.3.1): dichroism.md - Orchestration: orchestration.md - Configuration: configuration.md # - Troubleshooting: troubleshooting.md From 1aac2c8be83d83725fc7d05b85e678de152f96dd Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 23 Dec 2025 11:46:40 -0800 Subject: [PATCH 16/23] adjusting formatting and adding flow diagram for dichroism --- docs/mkdocs/docs/dichroism.md | 190 ++++++++++++++++++++++++---------- 1 file changed, 133 insertions(+), 57 deletions(-) diff --git a/docs/mkdocs/docs/dichroism.md b/docs/mkdocs/docs/dichroism.md index 3b1ca74..6a6e6e6 100644 --- a/docs/mkdocs/docs/dichroism.md +++ b/docs/mkdocs/docs/dichroism.md @@ -4,80 +4,156 @@ This page documents the workflows supported by Splash Flows for the ALS Dichrois - [Beamline 4.0.2](https://als.lbl.gov/beamlines/4-0-2/) - [Beamline 6.3.1](https://als.lbl.gov/beamlines/6-3-1/) -**Data at Dichroism Beamlines** -These beamlines generate X-ray magnetic circular dichroism (XMCD) and X-ray magnetic linear dichroism (XMLD) spectroscopy data. - ## File Watcher -There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call dispatcher kicks off the downstream steps: +There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: -Copy scans in real time from a Globus collection on the compute-dtn server to NERSC CFS using Globus Transfer. -Copy project data to NERSC HPSS for long-term storage (TBD). -Ingest into SciCat (TBD). -Schedule data pruning from compute-dtn and NERSC CFS. +- Copy scans in real time from a Globus collection on the compute-dtn server to `NERSC CFS` using Globus Transfer. +- Copy project data to `NERSC HPSS` for long-term storage (TBD). +- Ingest into SciCat (TBD). +- Schedule data pruning from `compute-dtn` and `NERSC CFS`. ## Prefect Configuration ### Registered Flows -#### dispatcher.py -The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the dispatcher() Flow is called with either BL402 or BL631 as a parameter to specify the beamline. The dispatcher handles the synchronous call to the appropriate move task. -move.py +#### [dispatcher.py](orchestration/flows/dichroism/dispatcher.py) +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called with either `BL402` or `BL631` as a parameter to specify the beamline. The dispatcher handles the synchronous call to the appropriate move task. + +#### [move.py](orchestration/flows/dichroism/move.py) Contains separate move tasks/flows for each beamline: -- process_new_402_file: Flow to process a new file at BL 4.0.2 -- process_new_631_file: Flow to process a new file at BL 6.3.1 +- `process_new_402_file`: Flow to process a new file at BL 4.0.2 +- `process_new_631_file`: Flow to process a new file at BL 6.3.1 Each flow performs the following steps: -Copy the file from compute-dtn to NERSC CFS and ingest the file path and metadata into SciCat. -Schedule pruning from compute-dtn. -Copy the file from NERSC CFS to NERSC HPSS. Ingest the archived file path in SciCat. -Schedule pruning from NERSC CFS. +- Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat. +- Schedule pruning from `compute-dtn`. +- Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat. +- Schedule pruning from `NERSC CFS` (after archiving). ### Work Pools and Queues -The following work pools are defined in orchestration/flows/dichroism/prefect.yaml: -DeploymentWork PoolWork Queuerun_dichroism_dispatcherdispatcher_dichroism_pooldispatcher_402_queue / dispatcher_631_queuenew_file_402new_file_dichroism_poolnew_file_402_queuenew_file_631new_file_dichroism_poolnew_file_631_queuetest_transfers_dichroismnew_file_dichroism_pooltest_transfers_dichroism_queue +The following work pools are defined in `orchestration/flows/dichroism/prefect.yaml`: + +| Deployment | Work Pool | Work Queue | +|------------|-----------|------------| +| `run_dichroism_dispatcher` | `dispatcher_dichroism_pool` | `dispatcher_402_queue` / `dispatcher_631_queue` | +| `new_file_402` | `new_file_dichroism_pool` | `new_file_402_queue` | +| `new_file_631` | `new_file_dichroism_pool` | `new_file_631_queue` | +| `test_transfers_dichroism` | `new_file_dichroism_pool` | `test_transfers_dichroism_queue` | + Both beamlines share the same work pools but use separate queues for fine-grained control. -Configuration -Globus Endpoints + +### Endpoint Configuration +**Globus Endpoints** Endpoints are defined in config.yml: -yamldata402: - root_path: /path/to/compute-dtn/4.0.2/data - uri: compute-dtn.als.lbl.gov - uuid: - name: data402 - -nersc402: - root_path: /global/cfs/cdirs/als/data_mover/4.0.2 - uri: nersc.gov - uuid: - name: nersc402 - -data631: - root_path: /path/to/compute-dtn/6.3.1/data - uri: compute-dtn.als.lbl.gov - uuid: - name: data631 - -nersc631: - root_path: /global/cfs/cdirs/als/data_mover/6.3.1 - uri: nersc.gov - uuid: - name: nersc631 -Environment Variables -Required environment variables (set in .env or container environment): -bashGLOBUS_CLIENT_ID= -GLOBUS_CLIENT_SECRET= -PREFECT_API_URL= -PREFECT_API_KEY= -Deployment -Register Flows +```yaml +bl402-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: + name: bl402-compute-dtn + +bl402-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/4.0.2/raw + uri: nersc.gov + uuid: + name: bl402-nersc_alsdev_raw + +bl631-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: + name: bl631-compute-dtn + +bl631-nersc_alsdev_raw: + root_path: /global/cfs/cdirs/als/data_mover/6.3.1/raw + uri: nersc.gov + uuid: + name: bl631-nersc_alsdev_raw +``` + +#### Deployment +**Register Flows** Using the init script with Docker: -bashBEAMLINE=dichroism ./init_work_pools.py +```bash +BEAMLINE=dichroism ./init_work_pools.py +``` Or deploy manually: -bashprefect deploy --prefect-file orchestration/flows/dichroism/prefect.yaml --all +```bash +prefect deploy --prefect-file orchestration/flows/dichroism/prefect.yaml --all +``` Start Workers -bashprefect worker start --pool "dispatcher_dichroism_pool" + +```bash +prefect worker start --pool "dispatcher_dichroism_pool" prefect worker start --pool "new_file_dichroism_pool" -VM Details +``` + +Deployment of the Prefect Server and Workers is handled in the [als_ansible](https://github.com/als-computing/als_ansible) repository. + +#### VM Details The computing backend runs on a VM managed by ALS IT staff. -flow-dichroism.als.lbl.govShareArtifactsDownload allDichroism readmeDocument · MD Project contentsplash_flowsCreated by youals-computing/splash_flowsmainGITHUB \ No newline at end of file +flow-dichroism.als.lbl.gov + +## Flow Diagram +```mermaid +sequenceDiagram + participant DET as Detector/
File Watcher + participant DISP as Prefect
Dispatcher + participant DTN as compute-dtn
Storage + participant GLOB as Globus
Transfer + participant CFS as NERSC
CFS + participant CAT as SciCat
Metadata + participant HPSS as NERSC
HPSS + + %% Initial Trigger + DET->>DET: Monitor filesystem + DET->>DISP: Trigger on new file + DISP->>DISP: Route to BL402 or BL631 + + %% Flow 1: new_file (402 or 631) + rect rgb(220, 230, 255) + note over DISP,CAT: FLOW 1: new_file_402 / new_file_631 + DISP->>GLOB: Init transfer + activate GLOB + GLOB->>DTN: Read from compute-dtn + DTN-->>GLOB: Data + GLOB->>CFS: Write to NERSC CFS + GLOB-->>DISP: Transfer complete + deactivate GLOB + + DISP->>CAT: Register metadata (TBD) + end + + %% Flow 2: HPSS Archive + rect rgb(220, 255, 230) + note over DISP,HPSS: FLOW 2: HPSS Archive (TBD) + DISP->>GLOB: Init archive transfer + activate GLOB + GLOB->>CFS: Read from CFS + CFS-->>GLOB: Data + GLOB->>HPSS: Write to tape + GLOB-->>DISP: Archive complete + deactivate GLOB + + DISP->>CAT: Update metadata (TBD) + end + + %% Flow 3: Scheduled Pruning + rect rgb(255, 255, 220) + note over DISP,CFS: FLOW 3: Scheduled Pruning + DISP->>DISP: Schedule prune (6 months) + + DISP->>DTN: Prune from compute-dtn + activate DTN + DTN->>DTN: Delete expired data + DTN-->>DISP: Pruning complete + deactivate DTN + + DISP->>CFS: Prune from CFS (after HPSS) + activate CFS + CFS->>CFS: Delete expired data + CFS-->>DISP: Pruning complete + deactivate CFS + end +``` \ No newline at end of file From 0cc7511912f4390e473861464b3c6c5204847354 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:27:25 -0800 Subject: [PATCH 17/23] updating dispatcher for dichroism to handle list of files --- orchestration/flows/dichroism/dispatcher.py | 42 +++++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/orchestration/flows/dichroism/dispatcher.py b/orchestration/flows/dichroism/dispatcher.py index 16d4943..4645cf2 100644 --- a/orchestration/flows/dichroism/dispatcher.py +++ b/orchestration/flows/dichroism/dispatcher.py @@ -1,8 +1,10 @@ from enum import Enum import logging -from prefect import flow +from pathlib import Path from typing import Optional, Union, Any +from prefect import flow, runtime + from orchestration.flows.dichroism.config import ConfigDichroism from orchestration.flows.dichroism.move import process_new_402_file_task, process_new_631_file_task @@ -14,18 +16,32 @@ class DichroismBeamlineEnum(str, Enum): BL631 = "BL631" +def generate_flow_run_name() -> str: + """Generate flow run name from runtime parameters.""" + params = runtime.flow_run.parameters + file_path = params.get("file_path") + if file_path is None: + return "dispatcher-no_file" + elif isinstance(file_path, str): + return f"dispatcher-{Path(file_path).name}" + elif len(file_path) == 1: + return f"dispatcher-{Path(file_path[0]).name}" + else: + return f"dispatcher-{Path(file_path[0]).name}_+{len(file_path)-1}_more" + + # TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config402 -@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +@flow(name="dispatcher", flow_run_name=generate_flow_run_name) def dispatcher( - file_path: Optional[str] = None, + file_path: Optional[Union[str, list[str]]] = None, is_export_control: bool = False, config: Optional[Union[dict, Any]] = None, beamline: Optional[DichroismBeamlineEnum] = None ) -> None: """ - Dispatcher flow for BL402 beamline that launches the new_402_file_flow. + Dispatcher flow for BL402 and BL631 beamlines that launches the new_402_file_flow. - :param file_path: Path to the file to be processed. + :param file_path: Path(s) to the file(s) to be processed. :param is_export_control: Flag indicating if export control measures should be applied. (Not used in the current BL402 processing) :param config: Configuration settings for processing. @@ -35,11 +51,21 @@ def dispatcher( :raises TypeError: If the provided configuration is not a dict or ConfigDichroism. """ - logger.info("Starting dispatcher flow for Dichroism.") - logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}", beamline=beamline) + # Normalize file_path to a list + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + num_files = len(file_paths) + + logger.info(f"Starting dispatcher flow for Dichroism with {num_files} file(s)") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}, beamline={beamline}") # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. - if file_path is None: + if not file_paths: # returns True for empty list logger.error("No file_path provided to dispatcher.") raise ValueError("File path is required for processing.") From aec9824ad4e95c395a220ad2fe7df6287b89be11 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:32:46 -0800 Subject: [PATCH 18/23] adding get_common_parent_path() function for dichroism move to handle multiple files in 1 globus transfer. We may want to move this to a higher level, so the same function can be used by multiple beamlines --- orchestration/flows/dichroism/move.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index 8d7f97c..451e114 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -1,5 +1,7 @@ import datetime import logging +import os +from pathlib import Path from typing import Optional from prefect import flow, task @@ -16,6 +18,24 @@ # Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. +def get_common_parent_path(file_paths: list[str]) -> str: + """ + Find the highest common parent directory for a list of file paths. + + :param file_paths: List of file paths + :return: Common parent directory path + """ + if not file_paths: + raise ValueError("No file paths provided") + + if len(file_paths) == 1: + # Single file - return its parent directory + return str(Path(file_paths[0]).parent) + + # Use os.path.commonpath for multiple files + return os.path.commonpath(file_paths) + + def prune( file_path: str = None, source_endpoint: GlobusEndpoint = None, From a75e3084d441efad95b60d61944d1eb1ebcdc877 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:45:49 -0800 Subject: [PATCH 19/23] Adding beegfs endpoints to dichroism beamlines config --- config.yml | 12 ++++++++++++ orchestration/flows/dichroism/config.py | 2 ++ 2 files changed, 14 insertions(+) diff --git a/config.yml b/config.yml index 0a69522..d8eb9de 100644 --- a/config.yml +++ b/config.yml @@ -15,6 +15,12 @@ globus: uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 name: bl402-nersc_alsdev_raw + bl402-beegfs_raw: + root_path: /beamline_staging/bl402/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl402-beegfs_raw + bl631-compute-dtn: root_path: / uri: compute-dtn.als.lbl.gov @@ -27,6 +33,12 @@ globus: uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 name: bl631-nersc_alsdev_raw + bl631-beegfs_raw: + root_path: /beamline_staging/bl631/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl631-beegfs_raw + # 7.0.1.2 ENDPOINTS nersc7012: diff --git a/orchestration/flows/dichroism/config.py b/orchestration/flows/dichroism/config.py index 643943c..5fe963b 100644 --- a/orchestration/flows/dichroism/config.py +++ b/orchestration/flows/dichroism/config.py @@ -11,5 +11,7 @@ def __init__(self) -> None: self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) self.bl402_compute_dtn = self.endpoints["bl402-compute-dtn"] self.bl402_nersc_alsdev_raw = self.endpoints["bl402-nersc_alsdev_raw"] + self.bl402_beegfs_raw = self.endpoints["bl402-beegfs_raw"] self.bl631_compute_dtn = self.endpoints["bl631-compute-dtn"] self.bl631_nersc_alsdev_raw = self.endpoints["bl631-nersc_alsdev_raw"] + self.bl631_beegfs_raw = self.endpoints["bl631-beegfs_raw"] From 1a18784b22e8073a76e7288bb3875a8c2af1cef3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:46:25 -0800 Subject: [PATCH 20/23] Updating bl402 move task to handle a list of files, more verbose logging, and staging on beegfs --- orchestration/flows/dichroism/move.py | 88 ++++++++++++++++++++------- 1 file changed, 67 insertions(+), 21 deletions(-) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index 451e114..e7585e3 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -2,9 +2,9 @@ import logging import os from pathlib import Path -from typing import Optional +from typing import Optional, Union -from prefect import flow, task +from prefect import flow, get_run_logger, task from prefect.variables import Variable from orchestration.flows.dichroism.config import ConfigDichroism @@ -153,9 +153,16 @@ def _prune_globus_endpoint( @flow(name="new_402_file_flow", flow_run_name="process_new-{file_path}") def process_new_402_file_flow( - file_path: str, + file_path: Union[str, list[str]], config: Optional[ConfigDichroism] = None ) -> None: + """ + process_new_402_file_flow calls the task to process a new file at BL 4.0.2 + + :param file_path: Path to the new file to be processed. + :param config: Beamline configuration settings for processing. + :return: None + """ process_new_402_file_task( file_path=file_path, config=config @@ -164,35 +171,63 @@ def process_new_402_file_flow( @task(name="new_402_file_task") def process_new_402_file_task( - file_path: str, + file_path: Union[str, list[str]], config: Optional[ConfigDichroism] = None ) -> None: """ Flow to process a new file at BL 4.0.2 - 1. Copy the file from the data402 to NERSC CFS. Ingest file path in SciCat. + 1. Copy the file(s) from the data402 to NERSC CFS. Ingest file path in SciCat. 2. Schedule pruning from data402. 6 months from now. - 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 3. Copy the file(s) from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. - :param file_path: Path to the new file to be processed. + :param file_path: Path to the new file(s) to be processed. :param config: Configuration settings for processing. """ + logger = get_run_logger() - logger.info(f"Processing new 402 file: {file_path}") + # Normalize file_path to a list + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + if not file_paths: + logger.error("No file_paths provided") + raise ValueError("No file_paths provided") + + logger.info(f"Processing new 402 file(s): {file_paths}") if not config: + logger.info("No config provided, initializing default ConfigDichroism") config = ConfigDichroism() + common_path = get_common_parent_path(file_paths) + logger.info(f"Common parent path: {common_path}") + + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) + logger.info(f"Step 1: Copying {common_path} from data402 to beegfs ({config.bl402_beegfs_raw.name})") transfer_controller.copy( - file_path=file_path, + file_path=common_path, + source=config.bl402_compute_dtn, + destination=config.bl402_beegfs_raw + ) + logger.info("Step 1 complete: File(s) copied to beegfs") + + logger.info(f"Step 2: Copying {common_path} from data402 to NERSC CFS ({config.bl402_nersc_alsdev_raw.name})") + transfer_controller.copy( + file_path=common_path, source=config.bl402_compute_dtn, destination=config.bl402_nersc_alsdev_raw ) + logger.info("Step 2 complete: File(s) copied to NERSC CFS") # TODO: Ingest file path in SciCat # Waiting for PR #62 to be merged (scicat_controller) @@ -201,14 +236,19 @@ def process_new_402_file_task( # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs - dichroism_settings = Variable.get("dichroism-settings") + logger.info("Step 3: Scheduling pruning from data402 endpoint") + dichroism_settings = Variable.get("dichroism-settings", _sync=True) - prune( - file_path=file_path, - source_endpoint=config.bl402_compute_dtn, - check_endpoint=config.bl402_nersc_alsdev_raw, - days_from_now=dichroism_settings["delete_data402_files_after_days"] # determine appropriate value: currently 6 months - ) + for fp in file_paths: + prune( + file_path=fp, + source_endpoint=config.bl402_compute_dtn, + check_endpoint=config.bl402_nersc_alsdev_raw, + days_from_now=dichroism_settings["delete_data402_files_after_days"], + config=config + ) + + logger.info("Step 3 complete: Pruning from data402 scheduled") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) @@ -249,9 +289,15 @@ def move_402_flight_check( @flow(name="new_631_file_flow", flow_run_name="process_new-{file_path}") def process_new_631_file_flow( - file_path: str, + file_path: Union[str, list[str]], config: Optional[ConfigDichroism] = None ) -> None: + """ + process_new_631_file_flow calls the task to process a new file at BL 6.3.1 + :param file_path: Path to the new file(s) to be processed. + :param config: Beamline configuration settings for processing. + :return: None + """ process_new_631_file_task( file_path=file_path, config=config @@ -260,17 +306,17 @@ def process_new_631_file_flow( @task(name="new_631_file_task") def process_new_631_file_task( - file_path: str, + file_path: Union[str, list[str]], config: Optional[ConfigDichroism] = None ) -> None: """ Flow to process a new file at BL 6.3.1 - 1. Copy the file from the data631 to NERSC CFS. Ingest file path in SciCat. + 1. Copy the file(s) from the data631 to NERSC CFS. Ingest file path in SciCat. 2. Schedule pruning from data631. 6 months from now. - 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 3. Copy the file(s) from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. - :param file_path: Path to the new file to be processed. + :param file_path: Path to the new file(s) to be processed. :param config: Configuration settings for processing. """ From 6e5d3e0edc510ab6c71f8c5634184b937a68b98b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:53:11 -0800 Subject: [PATCH 21/23] Updating bl631 move task to handle a list of files, more verbose logging, and staging on beegfs --- orchestration/flows/dichroism/move.py | 58 ++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index e7585e3..1c89e24 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -183,6 +183,7 @@ def process_new_402_file_task( :param file_path: Path to the new file(s) to be processed. :param config: Configuration settings for processing. + :return: None """ logger = get_run_logger() @@ -249,6 +250,7 @@ def process_new_402_file_task( ) logger.info("Step 3 complete: Pruning from data402 scheduled") + logger.info(f"All steps complete for {len(file_paths)} file(s)") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) @@ -262,6 +264,7 @@ def move_402_flight_check( file_path: str = "test_directory/test.txt", ): """Please keep your arms and legs inside the vehicle at all times.""" + logger = get_run_logger() logger.info("402 flight check: testing transfer from data402 to NERSC CFS") config = ConfigDichroism() @@ -294,6 +297,7 @@ def process_new_631_file_flow( ) -> None: """ process_new_631_file_flow calls the task to process a new file at BL 6.3.1 + :param file_path: Path to the new file(s) to be processed. :param config: Beamline configuration settings for processing. :return: None @@ -316,25 +320,54 @@ def process_new_631_file_task( 3. Copy the file(s) from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. 4. Schedule pruning from NERSC CFS. - :param file_path: Path to the new file(s) to be processed. + :param file_path: Path(s) to the new file(s) to be processed. :param config: Configuration settings for processing. + :return: None """ + logger = get_run_logger() - logger.info(f"Processing new 631 file: {file_path}") + # Normalize file_path to a list + if file_path is None: + file_paths = [] + elif isinstance(file_path, str): + file_paths = [file_path] + else: + file_paths = file_path + + if not file_paths: + logger.error("No file_paths provided") + raise ValueError("No file_paths provided") + + logger.info(f"Processing new 631 file(s): {file_paths}") if not config: + logger.info("No config provided, initializing default ConfigDichroism") config = ConfigDichroism() + common_path = get_common_parent_path(file_paths) + logger.info(f"Common parent path: {common_path}") + + logger.info("Initializing Globus transfer controller") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) + logger.info(f"Step 1: Copying {common_path} from data402 to beegfs ({config.bl402_beegfs_raw.name})") transfer_controller.copy( - file_path=file_path, + file_path=common_path, + source=config.bl631_compute_dtn, + destination=config.bl631_beegfs_raw + ) + logger.info("Step 1 complete: File(s) copied to beegfs") + + logger.info(f"Step 2: Copying {common_path} from data402 to NERSC CFS ({config.bl631_nersc_alsdev_raw.name})") + transfer_controller.copy( + file_path=common_path, source=config.bl631_compute_dtn, destination=config.bl631_nersc_alsdev_raw ) + logger.info("Step 2 complete: File(s) copied to NERSC CFS") # TODO: Ingest file path in SciCat # Waiting for PR #62 to be merged (scicat_controller) @@ -342,14 +375,19 @@ def process_new_631_file_task( # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs - dichroism_settings = Variable.get("dichroism-settings") + logger.info("Step 3: Scheduling pruning from data631 endpoint") + dichroism_settings = Variable.get("dichroism-settings", _sync=True) - prune( - file_path=file_path, - source_endpoint=config.bl631_compute_dtn, - check_endpoint=config.bl631_nersc_alsdev_raw, - days_from_now=dichroism_settings["delete_data631_files_after_days"] # determine appropriate value: currently 6 months - ) + for fp in file_paths: + prune( + file_path=fp, + source_endpoint=config.bl631_compute_dtn, + check_endpoint=config.bl631_nersc_alsdev_raw, + days_from_now=dichroism_settings["delete_data631_files_after_days"], + config=config + ) + logger.info("Step 3 complete: Pruning from data631 scheduled") + logger.info(f"All steps complete for {len(file_paths)} file(s)") # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) From d80a291abfca3ca7509f7f3534123d9736b13bec Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:56:19 -0800 Subject: [PATCH 22/23] beamline id typo --- orchestration/flows/dichroism/move.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/dichroism/move.py b/orchestration/flows/dichroism/move.py index 1c89e24..8d99e94 100644 --- a/orchestration/flows/dichroism/move.py +++ b/orchestration/flows/dichroism/move.py @@ -353,7 +353,7 @@ def process_new_631_file_task( config=config ) - logger.info(f"Step 1: Copying {common_path} from data402 to beegfs ({config.bl402_beegfs_raw.name})") + logger.info(f"Step 1: Copying {common_path} from data631 to beegfs ({config.bl631_beegfs_raw.name})") transfer_controller.copy( file_path=common_path, source=config.bl631_compute_dtn, @@ -361,7 +361,7 @@ def process_new_631_file_task( ) logger.info("Step 1 complete: File(s) copied to beegfs") - logger.info(f"Step 2: Copying {common_path} from data402 to NERSC CFS ({config.bl631_nersc_alsdev_raw.name})") + logger.info(f"Step 2: Copying {common_path} from data631 to NERSC CFS ({config.bl631_nersc_alsdev_raw.name})") transfer_controller.copy( file_path=common_path, source=config.bl631_compute_dtn, From 2d86c2de82343c259b4a966f8294d3102719ce5c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 16 Jan 2026 14:58:03 -0800 Subject: [PATCH 23/23] Updating pytest to handle the extra move to beegfs --- orchestration/_tests/test_dichroism/test_move.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/orchestration/_tests/test_dichroism/test_move.py b/orchestration/_tests/test_dichroism/test_move.py index 10090b3..d40c57a 100644 --- a/orchestration/_tests/test_dichroism/test_move.py +++ b/orchestration/_tests/test_dichroism/test_move.py @@ -109,7 +109,7 @@ def test_process_new_402_file_task(mocker: MockFixture) -> None: result = process_new_402_file_task(file_path=test_file_path, config=mock_config) # Verify that the transfer controller's copy method was called exactly once. - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -118,7 +118,7 @@ def test_process_new_402_file_task(mocker: MockFixture) -> None: mock_prune.reset_mock() result = process_new_402_file_task(file_path=test_file_path, config=None) - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -179,8 +179,8 @@ def test_process_new_631_file_task(mocker: MockFixture) -> None: # Execute the move flow with the test file path and mock configuration. result = process_new_631_file_task(file_path=test_file_path, config=mock_config) - # Verify that the transfer controller's copy method was called exactly once. - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + # Verify that the transfer controller's copy method was called exactly twice. + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -189,7 +189,7 @@ def test_process_new_631_file_task(mocker: MockFixture) -> None: mock_prune.reset_mock() result = process_new_631_file_task(file_path=test_file_path, config=None) - assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once"