Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bac02a4
init commit for bl 9.3.1
davramov Sep 10, 2025
3738106
Adjusting 9.3.1 endpoint configuration to use the compute-dtn globus …
davramov Oct 27, 2025
8d6e834
Adding pytests for bl931 flows
davramov Oct 28, 2025
9b35bd3
Making Config931 optional, since it is initialized within the methods…
davramov Oct 28, 2025
4da35bf
Adding documentation for bl 9.3.1 flows
davramov Oct 28, 2025
19128a8
changing the data description in the documentation
davramov Oct 28, 2025
62fd035
Making the move flow call the move task, and updating the pytest/disp…
davramov Nov 6, 2025
8948b1d
Adjusting endpoint names in the test movement flow
davramov Dec 3, 2025
560259a
Ensuring get_run_logger() is set in the flight check
davramov Dec 3, 2025
d395dcd
if the transfer test fails, it should throw a runtime error
davramov Dec 3, 2025
ffcb3b4
updating init_work_pools to handle when beamline id is a number (e.g.…
davramov Dec 4, 2025
0058581
adjusting the test directory path for 931
davramov Dec 8, 2025
965df42
Pinning fastapi==0.116.1; breaks with fastapi==0.124.0
davramov Dec 8, 2025
4832eb8
Switching JSON Blocks to Variable Blocks
davramov Dec 17, 2025
d925c89
Adding flow diagram to 9.3.1 documentation
davramov Dec 23, 2025
4236517
Updating flight scheck to move globus_test/test.txt
davramov Jan 14, 2026
fc437c6
removing commented out code
davramov Jan 14, 2026
c1689fc
adding _sync=True to Variable.get() call and added missing flow_run_n…
davramov Jan 14, 2026
ccaad74
Adding verbose logging to process_new_931_file_task
davramov Jan 14, 2026
79f280f
Adding transfer_success RuntimeError if False
davramov Jan 14, 2026
4b24dcb
Fixing the deployment name when scheduling prune_globus_endpoint
davramov Jan 14, 2026
8fa4939
Adding beegfs endpoint for 931
davramov Jan 14, 2026
b596ea0
Adding placeholder for transfer to beegfs
davramov Jan 14, 2026
1f6d976
reorganizing config.yml to be in numerical order by beamline number
davramov Jan 14, 2026
11a185e
Adding placeholder for 9.3.1 scicat ingestion
davramov Jan 14, 2026
87b11dc
removing config from the schedule_prefect_flow call due to serialization
davramov Jan 14, 2026
ac14b71
copy data to beegfs as part of the flow
davramov Jan 14, 2026
a2590c4
Updating 931 pytest to check for two transfers (1 to beegfs, another …
davramov Jan 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,26 @@ globus:
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc832

# 9.3.1 ENDPOINTS

bl931-beegfs-data:
root_path: /beamline_staging/bl931/raw/
uri: beegfs.als.lbl.gov
uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a
name: bl931-beegfs-data

bl931-compute-dtn:
root_path: /
uri: compute-dtn.als.lbl.gov
uuid: 23af478e-d459-4e78-9753-5091b5fb432a
name: bl931-compute-dtn

bl931-nersc_alsdev_raw:
root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw
uri: nersc.gov
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
name: bl931-nersc_alsdev_raw

globus_apps:
als_transfer:
client_id: ${GLOBUS_CLIENT_ID}
Expand Down
102 changes: 102 additions & 0 deletions docs/mkdocs/docs/bl931.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Beamline 9.3.1 Flows

This page documents the workflows supported by Splash Flows at [ALS Beamline 9.3.1 (Tender X-ray Spectroscopy)](https://als.lbl.gov/beamlines/9-3-1/).

## Data at 9.3.1

Generates spectroscopy data.

## File Watcher

There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps:
- Copy scans in real time from a Globus collection on the `compute-dtn` server to `NERSC CFS` using Globus Transfer.
- Copy project data to `NERSC HPSS` for long-term storage (TBD).
- Analysis on HPC systems (TBD).
- Ingest into SciCat (TBD).
- Schedule data pruning from `compute-dtn` and `NERSC CFS`.

## Prefect Configuration

### Registered Flows

#### `dispatcher.py`

The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).

#### `move.py`

Flow to process a new file at BL 9.3.1
1. Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat.
2. Schedule pruning from `compute-dtn`.
3. Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat.
4. Schedule pruning from `NERSC CFS`.

## VM Details

The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff.

`flow-931.als.lbl.gov`

## Flow Diagram

```mermaid
sequenceDiagram
participant DET as Detector/<br/>File Watcher
participant DISP as Prefect<br/>Dispatcher
participant DTN as compute-dtn<br/>Storage
participant GLOB as Globus<br/>Transfer
participant CFS as NERSC<br/>CFS
participant CAT as SciCat<br/>Metadata
participant HPSS as NERSC<br/>HPSS

%% Initial Trigger
DET->>DET: Monitor filesystem
DET->>DISP: Trigger on new file
DISP->>DISP: Coordinate flows

%% Flow 1: new_file_931
rect rgb(220, 230, 255)
note over DISP,CAT: FLOW 1: new_file_931
DISP->>GLOB: Init transfer
activate GLOB
GLOB->>DTN: Read from compute-dtn
DTN-->>GLOB: Data
GLOB->>CFS: Write to NERSC CFS
GLOB-->>DISP: Transfer complete
deactivate GLOB

DISP->>CAT: Register metadata (TBD)
end

%% Flow 2: HPSS Archive
rect rgb(220, 255, 230)
note over DISP,HPSS: FLOW 2: HPSS Archive (TBD)
DISP->>GLOB: Init archive transfer
activate GLOB
GLOB->>CFS: Read from CFS
CFS-->>GLOB: Data
GLOB->>HPSS: Write to tape
GLOB-->>DISP: Archive complete
deactivate GLOB

DISP->>CAT: Update metadata (TBD)
end

%% Flow 3: Scheduled Pruning
rect rgb(255, 255, 220)
note over DISP,CFS: FLOW 3: Scheduled Pruning
DISP->>DISP: Schedule prune tasks

DISP->>DTN: Prune from compute-dtn
activate DTN
DTN->>DTN: Delete expired data
DTN-->>DISP: Pruning complete
deactivate DTN

DISP->>CFS: Prune from CFS (after HPSS)
activate CFS
CFS->>CFS: Delete expired data
CFS-->>DISP: Pruning complete
deactivate CFS
end
```
6 changes: 4 additions & 2 deletions docs/mkdocs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ nav:
- Getting Started: getting_started.md
- Beamline Implementations:
- 7.3.3 SAXS/WAXS/GISAXS: bl733.md
- 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md
- 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md
- Beamline 8.3.2 - Microtomography:
- Compute at ALCF: alcf832.md
- Compute at NERSC: nersc832.md
- Beamline 9.3.1 - Tender X-ray Spectroscopy: bl931.md
- Orchestration: orchestration.md
- Configuration: configuration.md
# - Troubleshooting: troubleshooting.md
Expand Down
9 changes: 7 additions & 2 deletions init_work_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ def check_env() -> tuple[str, str, str]:
"""Validate required environment variables and paths."""
beamline = os.environ.get("BEAMLINE")
if not beamline:
logger.error("Must set BEAMLINE (e.g., 832, 733)")
logger.error("Must set BEAMLINE (e.g., 832, 733, dichroism)")
sys.exit(1)

prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
# Check if the beamline identifier is a number or a string to get the correct flows folder name
if beamline.isdigit():
prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
else:
prefect_yaml = f"orchestration/flows/{beamline}/prefect.yaml"

if not os.path.isfile(prefect_yaml):
logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!")
sys.exit(1)
Expand Down
Empty file.
205 changes: 205 additions & 0 deletions orchestration/_tests/test_bl931/test_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
'''Pytest unit tests for BL931 move flow. '''

import logging
import pytest
from uuid import uuid4

from prefect.testing.utilities import prefect_test_harness
from prefect.blocks.system import Secret
from prefect.variables import Variable
from pytest_mock import MockFixture

from orchestration._tests.test_transfer_controller import MockSecret
from orchestration.flows.bl931.config import Config931

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
"""
A pytest fixture that automatically sets up and tears down the Prefect test harness
for the entire test session. It creates and saves test secrets and configurations
required for Globus integration.

Yields:
None
"""
with prefect_test_harness():
globus_client_id = Secret(value=str(uuid4()))
globus_client_id.save(name="globus-client-id", overwrite=True)

globus_client_secret = Secret(value=str(uuid4()))
globus_client_secret.save(name="globus-client-secret", overwrite=True)

Variable.set(
name="globus-settings",
value={"max_wait_seconds": 600},
overwrite=True,
_sync=True
)

Variable.set(
name="bl931-settings",
value={
"delete_data931_files_after_days": 180
},
overwrite=True,
_sync=True
)

yield


# ----------------------------
# Tests for 931
# ----------------------------

def test_process_new_931_file_task(mocker: MockFixture) -> None:
"""
Test the process_new_931_file flow from orchestration.flows.bl931.move.

This test verifies that:
- The get_transfer_controller function is called (patched) with the correct parameters.
- The returned transfer controller's copy method is called with the expected file path,
source, and destination endpoints from the provided configuration.

Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the flow to test.
from orchestration.flows.bl931.move import process_new_931_file_task

# Patch the Secret.load and init_transfer_client in the configuration context.
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
mocker.patch(
"orchestration.flows.bl931.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl931.move.schedule_prefect_flow",
return_value=None
)

# Instantiate the dummy configuration.
mock_config = Config931()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Create a mock transfer controller with a mocked 'copy' method.
mock_transfer_controller = mocker.MagicMock()
mock_transfer_controller.copy.return_value = True

mock_prune = mocker.patch(
"orchestration.flows.bl931.move.prune",
return_value=None
)

# Patch get_transfer_controller where it is used in process_new_931_file_task.
mocker.patch(
"orchestration.flows.bl931.move.get_transfer_controller",
return_value=mock_transfer_controller
)

# Execute the move flow with the test file path and mock configuration.
result = process_new_931_file_task(file_path=test_file_path, config=mock_config)

# Verify that the transfer controller's copy method was called exactly once.
assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"

# Reset mocks and test with config=None
mock_transfer_controller.copy.reset_mock()
mock_prune.reset_mock()

result = process_new_931_file_task(file_path=test_file_path, config=None)
assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"


def test_dispatcher_931_flow(mocker: MockFixture) -> None:
"""
Test the dispatcher flow for BL931.

This test verifies that:
- The process_new_931_file_task function is called with the correct parameters
when the dispatcher flow is executed.
Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the dispatcher flow to test.
from orchestration.flows.bl931.dispatcher import dispatcher

# Create a mock configuration object.
class MockConfig:
pass

mock_config = MockConfig()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
mocker.patch(
"orchestration.flows.bl931.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl931.move.schedule_prefect_flow",
return_value=None
)

# Patch the process_new_931_file_task function to monitor its calls.
mock_process_new_931_file_task = mocker.patch(
"orchestration.flows.bl931.dispatcher.process_new_931_file_task",
return_value=None
)

# Execute the dispatcher flow with test parameters.
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=mock_config
)

# Verify that process_new_931_file_task was called exactly once with the expected arguments.
mock_process_new_931_file_task.assert_called_once_with(
file_path=test_file_path,
config=mock_config
)

# Verify that process_new_931_file_task is called even when config is None
mock_process_new_931_file_task.reset_mock()
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=None
)
mock_process_new_931_file_task.assert_called_once()

# Test error handling for missing file_path
mock_process_new_931_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=None,
is_export_control=False,
config=mock_config
)
mock_process_new_931_file_task.assert_not_called()

# Test error handling for export control flag
mock_process_new_931_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=test_file_path,
is_export_control=True,
config=mock_config
)
mock_process_new_931_file_task.assert_not_called()
Empty file.
14 changes: 14 additions & 0 deletions orchestration/flows/bl931/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from globus_sdk import TransferClient
from orchestration.globus import transfer


# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged)
class Config931:
def __init__(self) -> None:
config = transfer.get_config()
self.endpoints = transfer.build_endpoints(config)
self.apps = transfer.build_apps(config)
self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"])
self.bl931_compute_dtn = self.endpoints["bl931-compute-dtn"]
self.bl931_nersc_alsdev_raw = self.endpoints["bl931-nersc_alsdev_raw"]
self.bl931_beegfs = self.endpoints["bl931-beegfs-data"]
Loading