From 78e3f4dbd72587fb10ba57e2c68cccd34fe25e1c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 10:12:15 +0000 Subject: [PATCH 01/17] Write daily CSN summary with some parquet stats to allow the flowsheet query to be run retrospectively. --- src/locations.py | 1 + src/pipeline/Snakefile | 166 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 152 insertions(+), 15 deletions(-) diff --git a/src/locations.py b/src/locations.py index a961a17..0e9d12b 100644 --- a/src/locations.py +++ b/src/locations.py @@ -3,6 +3,7 @@ WAVEFORM_EXPORT_BASE = Path("/waveform-export") WAVEFORM_ORIGINAL_CSV = WAVEFORM_EXPORT_BASE / "original-csv" WAVEFORM_ORIGINAL_PARQUET = WAVEFORM_EXPORT_BASE / "original-parquet" +WAVEFORM_HASH_LOOKUPS = WAVEFORM_EXPORT_BASE / "hash-lookups" WAVEFORM_PSEUDONYMISED_PARQUET = WAVEFORM_EXPORT_BASE / "pseudonymised" WAVEFORM_SNAKEMAKE_LOGS = WAVEFORM_EXPORT_BASE / "snakemake-logs" WAVEFORM_FTPS_LOGS = WAVEFORM_EXPORT_BASE / "ftps-logs" diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 8d88fa7..15476a0 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,22 +1,28 @@ import json import time from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pyarrow.parquet as pq from snakemake.io import glob_wildcards + from exporter.ftps import do_upload from locations import ( + WAVEFORM_HASH_LOOKUPS, WAVEFORM_ORIGINAL_CSV, WAVEFORM_SNAKEMAKE_LOGS, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, + ORIGINAL_PARQUET_PATTERN, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, CSV_PATTERN, make_file_name, ) -from pathlib import Path from pseudon.hashing import do_hash from pseudon.pseudon import csv_to_parquets + def get_file_age(file_path: Path) -> timedelta: # need to use UTC to avoid DST issues file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) @@ -43,8 +49,56 @@ CSV_WAIT_TIME = timedelta(minutes=5) # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. + +class InputCsvFile: + """Represent the different files in the pipeline from the point of view of one + csn + day + variable + channel combination (ie. one "original CSV" file). + These files are glued together by the Snakemake rules. + """ + def __init__(self, + date: str, + csn: str, + variable_id: str, + channel_id: str, + units: str): + self.date = date + self.csn = csn + self.hashed_csn = hash_csn(csn) + self.variable_id = variable_id + self.channel_id = channel_id + self.units = units + self._subs_dict = dict( + date=self.date, + csn=self.csn, + hashed_csn=self.hashed_csn, + variable_id=self.variable_id, + channel_id=self.channel_id, + units=self.units + ) + + def get_subs_dict(self): + return self._subs_dict + + def get_original_csv_path(self) -> Path: + return Path(make_file_name(str(CSV_PATTERN), self.get_subs_dict())) + + def get_original_parquet_path(self) -> Path: + return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self.get_subs_dict())) + + def get_pseudonymised_parquet_path(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self.get_subs_dict()) + return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" + + def get_ftps_uploaded_file(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self.get_subs_dict()) + return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") + + def get_daily_hash_lookup(self) -> Path: + return WAVEFORM_HASH_LOOKUPS / f"{self.date}.hashes.json" + + def determine_eventual_outputs(): - # Parse all CSVs using the basic file name pattern + # Discover all CSVs using the basic file name pattern before = time.perf_counter() all_wc = glob_wildcards(CSV_PATTERN) @@ -59,15 +113,8 @@ def determine_eventual_outputs(): _all_outputs = [] for date, csn, variable_id, channel_id, units \ in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units): - subs_dict = dict( - date = date, - csn = csn, - hashed_csn = hash_csn(csn), - variable_id = variable_id, - channel_id = channel_id, - units = units - ) - orig_file = Path(make_file_name(str(CSV_PATTERN), subs_dict)) + input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) + orig_file = input_file_obj.get_original_csv_path() if csn == 'unmatched_csn': print(f"Skipping file with unmatched CSN: {orig_file}") continue @@ -75,15 +122,15 @@ def determine_eventual_outputs(): if file_age < CSV_WAIT_TIME: print(f"File too new (age={file_age}): {orig_file}") continue - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, subs_dict) - final_output_file = WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") - _all_outputs.append(final_output_file) + _all_outputs.append(input_file_obj) after = time.perf_counter() print(f"Calculated output files using newness threshold {CSV_WAIT_TIME} in {after - before} seconds") return _all_outputs, _hash_to_csn all_outputs, hash_to_csn = determine_eventual_outputs() +ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] +ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) def configure_file_logging(log_file): import logging @@ -98,9 +145,51 @@ def configure_file_logging(log_file): return logger +def parquet_min_max_value(parquet_path: Path, column_name): + """By the magic of parquet files we can get the min/max timestamps without loading it all into + memory or even reading every row.""" + parquet_file = pq.ParquetFile(parquet_path) + column_index = parquet_file.schema_arrow.get_field_index(column_name) + if column_index == -1: + raise ValueError(f"Column '{column_name}' not found in {parquet_path}") + + lowest_min = None + highest_max = None + + metadata = parquet_file.metadata + if metadata.num_rows == 0: + return None, None + + + # each row group will have its own min/max, so take the min of mins and the max of maxes + for row_group_index in range(metadata.num_row_groups): + column_meta = metadata.row_group(row_group_index).column(column_index) + column_stats = column_meta.statistics + # We created the parquets so we know they have up-to-date statistics. + # We have already checked the file is not empty (which causes empty stats), so treat missing + # statistics as an invalid file. + if column_stats is None or not column_stats.has_min_max: + raise ValueError(f"columns stats missing or min_max missing: {column_stats}") + if lowest_min is None or column_stats.min < lowest_min: + lowest_min = column_stats.min + if highest_max is None or column_stats.max > highest_max: + highest_max = column_stats.max + + return lowest_min, highest_max + + rule all: input: - all_outputs + ftps_uploaded = ALL_FTPS_UPLOADED, + daily_hash_lookups = ALL_DAILY_HASH_LOOKUPS + +rule all_ftps_uploaded: + input: + ALL_FTPS_UPLOADED + +rule all_daily_hash_lookups: + input: + ALL_DAILY_HASH_LOOKUPS def input_file_maker(wc): unhashed_csn = hash_to_csn[wc.hashed_csn] @@ -139,6 +228,53 @@ rule csv_to_parquet: units=wildcards.units) +def pseudonymised_parquet_files_for_date(wc): + return [ao.get_pseudonymised_parquet_path() for ao in all_outputs if ao.date == wc.date] + + +rule make_daily_hash_lookup: + input: + # we declare the pseud parquets here to make the DAG work, but + # then later we "cheat" and use InputCsvFile objects to get the original CSN + pseudonymised_parquets = pseudonymised_parquet_files_for_date + output: + hash_lookup_json = WAVEFORM_HASH_LOOKUPS / "{date}.hashes.json" + run: + daily_files = [ao for ao in all_outputs if ao.date == wildcards.date] + print( + f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: " + f"{input.pseudonymised_parquets}" + ) + min_timestamp_key = 'min_timestamp' + max_timestamp_key = 'max_timestamp' + hash_summary_by_csn = {} + for daily_file in daily_files: + entry = {} + original_parquet = daily_file.get_original_parquet_path() + entry["csn"] = daily_file.csn + entry["hashed_csn"] = daily_file.hashed_csn + min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") + if min_timestamp is None or max_timestamp is None: + # do not contribute to stats + print(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") + break + entry[min_timestamp_key] = min_timestamp + entry[max_timestamp_key] = max_timestamp + existing_entry = hash_summary_by_csn.get(daily_file.csn) + if existing_entry is None: + hash_summary_by_csn[daily_file.csn] = entry + else: + # update the limits (there can be multiple files for the same CSN because each variable/channel + # is in its own file) + existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key]) + existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key]) + + hash_summary = list(hash_summary_by_csn.values()) + + with open(output.hash_lookup_json, "w") as fh: + json.dump(hash_summary, fh, indent=0) + + rule send_ftps: input: WAVEFORM_PSEUDONYMISED_PARQUET / (FILE_STEM_PATTERN_HASHED + ".parquet") From 5c237c9bd99eae35c426258fd94e04cd0199b54c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 17:57:59 +0000 Subject: [PATCH 02/17] Add snakemake workflow integration test --- .github/workflows/pytest.yml | 4 + .gitignore | 4 + exporter-scripts/scheduled-script.sh | 1 + pyproject.toml | 5 ++ tests/test_snakemake_integration.py | 109 +++++++++++++++++++++++++++ 5 files changed, 123 insertions(+) create mode 100644 tests/test_snakemake_integration.py diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index cc5aa34..806260c 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -40,6 +40,10 @@ jobs: working-directory: waveform-controller run: uv pip install '.[dev]' + - name: Build exporter image + working-directory: waveform-controller + run: docker compose -f docker-compose.yml build waveform-exporter + - name: Run the tests working-directory: waveform-controller run: uv run pytest tests diff --git a/.gitignore b/.gitignore index be3f5ce..cc2903b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,7 @@ wheels/ # settings files (should not be in the source tree anyway, but just in case) *.env + + +# pytest tmp paths +.pytest_tmp diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index 6faa7be..40b2887 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -30,3 +30,4 @@ snakemake --snakefile /app/src/pipeline/Snakefile \ ret_code=$? set -e echo "$0: snakemake exited with return code $ret_code" +exit $ret_code diff --git a/pyproject.toml b/pyproject.toml index 7ee53db..17f395e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,5 +25,10 @@ emap-extract-waveform = "controller:receiver" emap-csv-pseudon = "pseudon.pseudon:psudon_cli" emap-send-ftps = "exporter.ftps:do_upload_cli" +[tool.pytest.ini_options] +# Force temp dirs under the repo so Docker can mount them on macOS. +# The default under /private/var/folders seems to silently fail (gives you an empty directory) +addopts = "--basetemp=.pytest_tmp" + [tool.mypy] ignore_missing_imports = true diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py new file mode 100644 index 0000000..3b7df80 --- /dev/null +++ b/tests/test_snakemake_integration.py @@ -0,0 +1,109 @@ +import json +import os +import shutil +import subprocess +import time +from pathlib import Path + +import pytest + +from src.pseudon.hashing import do_hash + + +def _run_compose(compose_file: Path, args: list[str], cwd: Path) -> subprocess.CompletedProcess: + cmd = ["docker", "compose", "-f", str(compose_file), *args] + return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True) + + +@pytest.fixture(scope="session", autouse=True) +def build_exporter_image(): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + result = _run_compose(compose_file, ["build", "waveform-exporter"], cwd=repo_root) + if result.returncode != 0: + pytest.fail( + "docker compose build waveform-exporter failed\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + +def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + + date = "2025-01-01" + # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later! + csn = "SECRET_CSN_1234" + mrn = "SECRET_MRN_12345" + loc = "SECRET_LOCATION_123" + variable_id = "11" + channel_id = "3" + units = "uV" + + original_csv_dir = tmp_path / "original-csv" + original_csv_dir.mkdir(parents=True, exist_ok=True) + csv_path = ( + original_csv_dir / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.csv" + ) + csv_path.write_text( + "csn,mrn,source_variable_id,source_channel_id,units,sampling_rate,timestamp,location,values\n" + f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795156.0,{loc},\"[1.0,2.0]\"\n" + f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795157.0,{loc},\"[3.0, 4.0]\"\n" + ) + # The test input CSV file needs to be old enough so that snakemake doesn't skip it + old_time = time.time() - (10 * 60) + os.utime(csv_path, (old_time, old_time)) + + compose_args = [ + "run", + "--rm", + # we override the volume defined in the compose file to be the pytest tmp path + "-v", + f"{tmp_path}:/waveform-export", + "--entrypoint", + "/app/exporter-scripts/scheduled-script.sh", + "-e", + "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups", + "-e", + "SNAKEMAKE_CORES=1", + "waveform-exporter", + ] + result = _run_compose( + compose_file, + compose_args, + cwd=repo_root, + ) + # for convenience print the snakemake log files if they exist (on success or error) + outer_logs_dir = tmp_path / "snakemake-logs" + outer_logs = sorted(outer_logs_dir.glob("snakemake-outer-log*.log")) + if not outer_logs: + print("No outer logs found") + for ol in outer_logs: + print(f"Log file {ol}:") + print(ol.read_text()) + if result.returncode != 0: + pytest.fail( + "docker compose run waveform-exporter failed\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + expected_hashed_csn = do_hash("csn", csn) + pseudon_path = ( + tmp_path + / "pseudonymised" + / f"{date}.{expected_hashed_csn}.{variable_id}.{channel_id}.{units}.parquet" + ) + hash_lookup_path = tmp_path / "hash-lookups" / f"{date}.hashes.json" + + assert pseudon_path.exists() + assert hash_lookup_path.exists() + + # does our CSN -> hashed_csn + hash_lookup = json.loads(hash_lookup_path.read_text()) + assert isinstance(hash_lookup, list) + assert any( + entry.get("csn") == csn and entry.get("hashed_csn") == expected_hashed_csn + for entry in hash_lookup + ) From 0ac4759392a82689aae9afe164298888981150f3 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 18:23:56 +0000 Subject: [PATCH 03/17] Ensure sensitive fields have been removed from parquets --- tests/test_snakemake_integration.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 3b7df80..610a0cd 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -1,6 +1,6 @@ import json import os -import shutil +import pyarrow.parquet as pq import subprocess import time from pathlib import Path @@ -14,6 +14,8 @@ def _run_compose(compose_file: Path, args: list[str], cwd: Path) -> subprocess.C cmd = ["docker", "compose", "-f", str(compose_file), *args] return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True) +EXPECTED_COLUMN_NAMES = ["csn", "mrn", "source_variable_id", "source_channel_id", "units", + "sampling_rate", "timestamp", "location", "values"] @pytest.fixture(scope="session", autouse=True) def build_exporter_image(): @@ -47,7 +49,7 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): original_csv_dir / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.csv" ) csv_path.write_text( - "csn,mrn,source_variable_id,source_channel_id,units,sampling_rate,timestamp,location,values\n" + ",".join(EXPECTED_COLUMN_NAMES) + "\n" f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795156.0,{loc},\"[1.0,2.0]\"\n" f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795157.0,{loc},\"[3.0, 4.0]\"\n" ) @@ -90,6 +92,11 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): ) expected_hashed_csn = do_hash("csn", csn) + original_parquet_path = ( + tmp_path + / "original-parquet" + / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.parquet" + ) pseudon_path = ( tmp_path / "pseudonymised" @@ -97,6 +104,7 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): ) hash_lookup_path = tmp_path / "hash-lookups" / f"{date}.hashes.json" + assert original_parquet_path.exists() assert pseudon_path.exists() assert hash_lookup_path.exists() @@ -107,3 +115,17 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): entry.get("csn") == csn and entry.get("hashed_csn") == expected_hashed_csn for entry in hash_lookup ) + _check_parquet(pseudon_path, allow_no_secrets=True) + _check_parquet(original_parquet_path, allow_no_secrets=False) + + +def _check_parquet(parquet_path: Path, allow_no_secrets=True): + parquet_file = pq.ParquetFile(parquet_path) + column_names = parquet_file.schema_arrow.names + assert column_names == EXPECTED_COLUMN_NAMES + reader = parquet_file.read() + for column_name in column_names: + all_values = reader[column_name].combine_chunks() + if allow_no_secrets: + assert not any(('SECRET' in str(v) for v in all_values)), \ + f"{all_values} contains SECRET string" From 050a225eacaef7ad057ef9402b4c371f8dad7cf8 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 18:26:54 +0000 Subject: [PATCH 04/17] Linting fixes --- tests/test_snakemake_integration.py | 38 +++++++++++++++++++---------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 610a0cd..b30d059 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -10,12 +10,25 @@ from src.pseudon.hashing import do_hash -def _run_compose(compose_file: Path, args: list[str], cwd: Path) -> subprocess.CompletedProcess: +def _run_compose( + compose_file: Path, args: list[str], cwd: Path +) -> subprocess.CompletedProcess: cmd = ["docker", "compose", "-f", str(compose_file), *args] return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True) -EXPECTED_COLUMN_NAMES = ["csn", "mrn", "source_variable_id", "source_channel_id", "units", - "sampling_rate", "timestamp", "location", "values"] + +EXPECTED_COLUMN_NAMES = [ + "csn", + "mrn", + "source_variable_id", + "source_channel_id", + "units", + "sampling_rate", + "timestamp", + "location", + "values", +] + @pytest.fixture(scope="session", autouse=True) def build_exporter_image(): @@ -45,13 +58,11 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): original_csv_dir = tmp_path / "original-csv" original_csv_dir.mkdir(parents=True, exist_ok=True) - csv_path = ( - original_csv_dir / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.csv" - ) + csv_path = original_csv_dir / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.csv" csv_path.write_text( ",".join(EXPECTED_COLUMN_NAMES) + "\n" - f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795156.0,{loc},\"[1.0,2.0]\"\n" - f"{csn},{mrn},{variable_id},{channel_id},{units},100,1769795157.0,{loc},\"[3.0, 4.0]\"\n" + f'{csn},{mrn},{variable_id},{channel_id},{units},100,1769795156.0,{loc},"[1.0,2.0]"\n' + f'{csn},{mrn},{variable_id},{channel_id},{units},100,1769795157.0,{loc},"[3.0, 4.0]"\n' ) # The test input CSV file needs to be old enough so that snakemake doesn't skip it old_time = time.time() - (10 * 60) @@ -93,9 +104,9 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): expected_hashed_csn = do_hash("csn", csn) original_parquet_path = ( - tmp_path - / "original-parquet" - / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.parquet" + tmp_path + / "original-parquet" + / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.parquet" ) pseudon_path = ( tmp_path @@ -127,5 +138,6 @@ def _check_parquet(parquet_path: Path, allow_no_secrets=True): for column_name in column_names: all_values = reader[column_name].combine_chunks() if allow_no_secrets: - assert not any(('SECRET' in str(v) for v in all_values)), \ - f"{all_values} contains SECRET string" + assert not any( + ("SECRET" in str(v) for v in all_values) + ), f"{all_values} in column {column_name} contains SECRET string" From 2b2144febeb91db4648e87755012a860ac71241a Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 18:43:50 +0000 Subject: [PATCH 05/17] Create config file which won't exist on GHA --- tests/test_snakemake_integration.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index b30d059..8e9873e 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -47,6 +47,13 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): repo_root = Path(__file__).resolve().parents[1] compose_file = repo_root / "docker-compose.yml" + exporter_config_file = repo_root.parent / "config" / "exporter.env" + # This is mainly needed for Github actions because the docker-compose file requires the + # config file to exist, but be gentle because this might also be your development area! + # Empty is fine, actual config is passed through on the command line later. + exporter_config_file.parent.mkdir(exist_ok=True) + exporter_config_file.touch() + date = "2025-01-01" # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later! csn = "SECRET_CSN_1234" From 04bb57b4cea6933a8fbff0f433a1756f06bcbc8a Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 19:31:16 +0000 Subject: [PATCH 06/17] Compare orig and pseudon parquets to see that they're identical except for where hashing should have occurred --- tests/test_snakemake_integration.py | 60 ++++++++++++++++++----------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 8e9873e..f008324 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -1,5 +1,7 @@ import json import os +import re + import pyarrow.parquet as pq import subprocess import time @@ -35,12 +37,8 @@ def build_exporter_image(): repo_root = Path(__file__).resolve().parents[1] compose_file = repo_root / "docker-compose.yml" result = _run_compose(compose_file, ["build", "waveform-exporter"], cwd=repo_root) - if result.returncode != 0: - pytest.fail( - "docker compose build waveform-exporter failed\n" - f"stdout:\n{result.stdout}\n" - f"stderr:\n{result.stderr}" - ) + print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") + result.check_returncode() def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): @@ -102,12 +100,9 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): for ol in outer_logs: print(f"Log file {ol}:") print(ol.read_text()) - if result.returncode != 0: - pytest.fail( - "docker compose run waveform-exporter failed\n" - f"stdout:\n{result.stdout}\n" - f"stderr:\n{result.stderr}" - ) + # print all output then raise if there was an error + print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") + result.check_returncode() expected_hashed_csn = do_hash("csn", csn) original_parquet_path = ( @@ -133,18 +128,37 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): entry.get("csn") == csn and entry.get("hashed_csn") == expected_hashed_csn for entry in hash_lookup ) - _check_parquet(pseudon_path, allow_no_secrets=True) - _check_parquet(original_parquet_path, allow_no_secrets=False) + _check_parquets(original_parquet_path, pseudon_path) -def _check_parquet(parquet_path: Path, allow_no_secrets=True): - parquet_file = pq.ParquetFile(parquet_path) - column_names = parquet_file.schema_arrow.names +def _check_parquets(original_parquet_path: Path, pseudon_parquet_path: Path): + # columns where we expect the values to differ due to pseudonymisation + COLUMN_EXPECT_DIFFERENT = ["csn", "mrn", "location"] + orig_parquet_file = pq.ParquetFile(original_parquet_path) + pseudon_parquet_file = pq.ParquetFile(pseudon_parquet_path) + column_names = orig_parquet_file.schema_arrow.names assert column_names == EXPECTED_COLUMN_NAMES - reader = parquet_file.read() + assert column_names == pseudon_parquet_file.schema_arrow.names + orig_reader = orig_parquet_file.read() + pseudon_reader = pseudon_parquet_file.read() for column_name in column_names: - all_values = reader[column_name].combine_chunks() - if allow_no_secrets: - assert not any( - ("SECRET" in str(v) for v in all_values) - ), f"{all_values} in column {column_name} contains SECRET string" + orig_all_values = orig_reader[column_name].combine_chunks() + pseudon_all_values = pseudon_reader[column_name].combine_chunks() + # pseudonymised contains no secrets + assert not any( + ("SECRET" in str(v) for v in pseudon_all_values) + ), f"{pseudon_all_values} in column {column_name} contains SECRET string" + if column_name not in COLUMN_EXPECT_DIFFERENT: + # no pseudon expected, should be identical + assert orig_all_values == pseudon_all_values + else: + # pseudon expected, check that it looks like a hash + assert all( + # will need lengthening when we use real hashes! + re.match(r"[a-f0-9]{8}$", str(v)) + for v in pseudon_all_values + ), f"{pseudon_all_values} in column {column_name} does not appear to be a hash" + # orig, all sensitive values contain SECRET + assert all( + "SECRET" in str(v) for v in orig_all_values + ), f"{orig_all_values} in column {column_name} contains SECRET string" From dfce7d0be4feb364580273e70702fc8558db0931 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 21:28:18 +0000 Subject: [PATCH 07/17] Cache the docker build --- .github/workflows/pytest.yml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 806260c..bc7a780 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -40,9 +40,18 @@ jobs: working-directory: waveform-controller run: uv pip install '.[dev]' - - name: Build exporter image - working-directory: waveform-controller - run: docker compose -f docker-compose.yml build waveform-exporter + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build exporter image (cached) + uses: docker/bake-action@v6 + with: + workdir: waveform-controller + files: docker-compose.yml + targets: waveform-exporter + set: | + *.cache-from=type=gha + *.cache-to=type=gha,mode=max - name: Run the tests working-directory: waveform-controller From 518f6d2caa47ac92bbb3816dca53de56c504aab0 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 21:37:02 +0000 Subject: [PATCH 08/17] Apparently buildx action requires env_file to exist --- .github/workflows/pytest.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index bc7a780..ea05080 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -40,6 +40,14 @@ jobs: working-directory: waveform-controller run: uv pip install '.[dev]' + - name: Prepare config env files for compose + working-directory: waveform-controller + run: | + mkdir -p ../config + cp config.EXAMPLE/exporter.env.EXAMPLE ../config/exporter.env + cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env + cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 From ab2e0c41bea23746a3e1c9f8503e78e6a2492565 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 21:46:40 +0000 Subject: [PATCH 09/17] context dir is all confused --- .github/workflows/pytest.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index ea05080..a88f1a5 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -47,6 +47,7 @@ jobs: cp config.EXAMPLE/exporter.env.EXAMPLE ../config/exporter.env cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env + ls -laR .. - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 @@ -54,8 +55,7 @@ jobs: - name: Build exporter image (cached) uses: docker/bake-action@v6 with: - workdir: waveform-controller - files: docker-compose.yml + files: waveform-controller/docker-compose.yml targets: waveform-exporter set: | *.cache-from=type=gha From 49f8c5972de5a9570a97b25d66a2a4ac8e494227 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 21:53:58 +0000 Subject: [PATCH 10/17] Try build-push-action instead --- .github/workflows/pytest.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index a88f1a5..c922bc7 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -53,13 +53,13 @@ jobs: uses: docker/setup-buildx-action@v3 - name: Build exporter image (cached) - uses: docker/bake-action@v6 + uses: docker/build-push-action@v6 with: - files: waveform-controller/docker-compose.yml - targets: waveform-exporter - set: | - *.cache-from=type=gha - *.cache-to=type=gha,mode=max + context: . + file: waveform-controller/Dockerfile + target: waveform_exporter + cache-from: type=gha + cache-to: type=gha,mode=max - name: Run the tests working-directory: waveform-controller From e9f748bc59ed569c018c6bae4c2026894ed89a61 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 30 Jan 2026 22:03:01 +0000 Subject: [PATCH 11/17] No longer need to create config file here if we're doing it in GHA script --- .github/workflows/pytest.yml | 1 - tests/test_snakemake_integration.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index c922bc7..eb2a872 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -47,7 +47,6 @@ jobs: cp config.EXAMPLE/exporter.env.EXAMPLE ../config/exporter.env cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env - ls -laR .. - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index f008324..851dcec 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -45,13 +45,6 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): repo_root = Path(__file__).resolve().parents[1] compose_file = repo_root / "docker-compose.yml" - exporter_config_file = repo_root.parent / "config" / "exporter.env" - # This is mainly needed for Github actions because the docker-compose file requires the - # config file to exist, but be gentle because this might also be your development area! - # Empty is fine, actual config is passed through on the command line later. - exporter_config_file.parent.mkdir(exist_ok=True) - exporter_config_file.touch() - date = "2025-01-01" # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later! csn = "SECRET_CSN_1234" From a6bf4b6336e83faa19cd59e728265e64843a1c19 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 2 Feb 2026 13:03:33 +0000 Subject: [PATCH 12/17] Check that original data matches original parquet, and that parquet files match each other where appropriate --- tests/test_snakemake_integration.py | 164 ++++++++++++++++++++++------ 1 file changed, 129 insertions(+), 35 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 851dcec..140351d 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -1,13 +1,19 @@ import json +import math import os import re +from dataclasses import dataclass +from decimal import Decimal +import random +import pyarrow as pa import pyarrow.parquet as pq import subprocess import time from pathlib import Path import pytest +from stablehash import stablehash from src.pseudon.hashing import do_hash @@ -41,30 +47,114 @@ def build_exporter_image(): result.check_returncode() -def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): - repo_root = Path(__file__).resolve().parents[1] - compose_file = repo_root / "docker-compose.yml" - - date = "2025-01-01" - # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later! - csn = "SECRET_CSN_1234" - mrn = "SECRET_MRN_12345" - loc = "SECRET_LOCATION_123" - variable_id = "11" - channel_id = "3" - units = "uV" - +@dataclass +class TestFileDescription: + date: str + csn: str + mrn: str + location: str + variable_id: str + channel_id: str + sampling_rate: int + units: str + num_rows: int + _test_values: list = None + + def get_hashed_csn(self): + return do_hash("csn", self.csn) + + def get_orig_csv(self): + return f"{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" + + def get_orig_parquet(self): + return f"{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_pseudon_parquet(self): + return f"{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_hashes(self): + return f"{self.date}.hashes.json" + + def get_stable_hash(self): + """To aid in generating different but repeatable test data for each file.""" + return stablehash( + ( + self.date, + self.csn, + self.mrn, + self.location, + self.variable_id, + self.channel_id, + ) + ) + + def get_stable_seed(self): + byte_hash = self.get_stable_hash().digest()[:4] + return int.from_bytes(byte_hash) + + def generate_data(self, vals_per_row: int): + if self._test_values is None: + seed = self.get_stable_seed() + rng = random.Random(seed) + base_ampl = rng.normalvariate(1, 0.2) + base_offset = rng.normalvariate(0, 0.2) + self._test_values = [] + for row_num in range(self.num_rows): + values_row = [ + Decimal.from_float( + base_ampl * math.sin(base_offset + row_num * vals_per_row + i) + ).quantize(Decimal("1.0000")) + for i in range(vals_per_row) + ] + self._test_values.append(values_row) + # return as string but keep the numerical representation for comparison to parquet later + return self._test_values + + +def _make_test_input_csv(tmp_path, t: TestFileDescription): original_csv_dir = tmp_path / "original-csv" original_csv_dir.mkdir(parents=True, exist_ok=True) - csv_path = original_csv_dir / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.csv" - csv_path.write_text( - ",".join(EXPECTED_COLUMN_NAMES) + "\n" - f'{csn},{mrn},{variable_id},{channel_id},{units},100,1769795156.0,{loc},"[1.0,2.0]"\n' - f'{csn},{mrn},{variable_id},{channel_id},{units},100,1769795157.0,{loc},"[3.0, 4.0]"\n' - ) + csv_path = original_csv_dir / t.get_orig_csv() + secs_per_row = 1 + vals_per_row = t.sampling_rate * secs_per_row + test_data = t.generate_data(vals_per_row) + with open(csv_path, "w") as f: + f.write(",".join(EXPECTED_COLUMN_NAMES) + "\n") + start_time = time.time() - 15 * 60 + row_time = start_time + for td in test_data: + row_values_str = ", ".join(str(v) for v in td) + f.write( + f'{t.csn},{t.mrn},{t.variable_id},{t.channel_id},{t.units},{t.sampling_rate},{row_time},{t.location},"[{row_values_str}]"\n' + ) + row_time += secs_per_row # The test input CSV file needs to be old enough so that snakemake doesn't skip it old_time = time.time() - (10 * 60) os.utime(csv_path, (old_time, old_time)) + return test_data + + +def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + + # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later + file1 = TestFileDescription( + "2025-01-01", + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "11", + "3", + 100, + "uV", + 5, + ) + test_data1 = _make_test_input_csv(tmp_path, file1) + # file2 = _make_test_input_csv(tmp_path) + # file3 = _make_test_input_csv(tmp_path) + + # file1.channel_id, file1.csn, file1.date, file1.units, file1.variable_id compose_args = [ "run", @@ -97,34 +187,38 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") result.check_returncode() - expected_hashed_csn = do_hash("csn", csn) - original_parquet_path = ( - tmp_path - / "original-parquet" - / f"{date}.{csn}.{variable_id}.{channel_id}.{units}.parquet" - ) - pseudon_path = ( - tmp_path - / "pseudonymised" - / f"{date}.{expected_hashed_csn}.{variable_id}.{channel_id}.{units}.parquet" - ) - hash_lookup_path = tmp_path / "hash-lookups" / f"{date}.hashes.json" + expected_hashed_csn = file1.get_hashed_csn() + original_parquet_path = tmp_path / "original-parquet" / file1.get_orig_parquet() + pseudon_path = tmp_path / "pseudonymised" / file1.get_pseudon_parquet() + hash_lookup_path = tmp_path / "hash-lookups" / file1.get_hashes() assert original_parquet_path.exists() assert pseudon_path.exists() assert hash_lookup_path.exists() - # does our CSN -> hashed_csn + # inspect our CSN -> hashed_csn lookup file hash_lookup = json.loads(hash_lookup_path.read_text()) assert isinstance(hash_lookup, list) assert any( - entry.get("csn") == csn and entry.get("hashed_csn") == expected_hashed_csn + entry.get("csn") == file1.csn and entry.get("hashed_csn") == expected_hashed_csn for entry in hash_lookup ) - _check_parquets(original_parquet_path, pseudon_path) + _compare_original_parquet_to_expected(original_parquet_path, test_data1) + _compare_parquets(test_data1, original_parquet_path, pseudon_path) + + +def _compare_original_parquet_to_expected(original_parquet: Path, expected_test_values): + # CSV should always match original parquet + orig_parquet_file = pq.ParquetFile(original_parquet) + orig_reader = orig_parquet_file.read() + orig_all_values = orig_reader["values"].combine_chunks() + expected_pa = pa.array(expected_test_values, type=orig_all_values.type) + assert orig_all_values == expected_pa -def _check_parquets(original_parquet_path: Path, pseudon_parquet_path: Path): +def _compare_parquets( + expected_test_values, original_parquet_path: Path, pseudon_parquet_path: Path +): # columns where we expect the values to differ due to pseudonymisation COLUMN_EXPECT_DIFFERENT = ["csn", "mrn", "location"] orig_parquet_file = pq.ParquetFile(original_parquet_path) From 2dfaeb36c9637e934cb222b336254733f7366946 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 2 Feb 2026 20:39:47 +0000 Subject: [PATCH 13/17] Add enough test data to be sure hash summaries are summarising properly --- tests/test_snakemake_integration.py | 130 +++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 24 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 140351d..dc771d8 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -50,6 +50,7 @@ def build_exporter_image(): @dataclass class TestFileDescription: date: str + start_timestamp: float csn: str mrn: str location: str @@ -92,7 +93,7 @@ def get_stable_seed(self): byte_hash = self.get_stable_hash().digest()[:4] return int.from_bytes(byte_hash) - def generate_data(self, vals_per_row: int): + def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: if self._test_values is None: seed = self.get_stable_seed() rng = random.Random(seed) @@ -111,7 +112,7 @@ def generate_data(self, vals_per_row: int): return self._test_values -def _make_test_input_csv(tmp_path, t: TestFileDescription): +def _make_test_input_csv(tmp_path, t: TestFileDescription) -> list[list[Decimal]]: original_csv_dir = tmp_path / "original-csv" original_csv_dir.mkdir(parents=True, exist_ok=True) csv_path = original_csv_dir / t.get_orig_csv() @@ -120,7 +121,7 @@ def _make_test_input_csv(tmp_path, t: TestFileDescription): test_data = t.generate_data(vals_per_row) with open(csv_path, "w") as f: f.write(",".join(EXPECTED_COLUMN_NAMES) + "\n") - start_time = time.time() - 15 * 60 + start_time = t.start_timestamp row_time = start_time for td in test_data: row_values_str = ", ".join(str(v) for v in td) @@ -135,12 +136,14 @@ def _make_test_input_csv(tmp_path, t: TestFileDescription): def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): + # ARRANGE repo_root = Path(__file__).resolve().parents[1] compose_file = repo_root / "docker-compose.yml" # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later file1 = TestFileDescription( "2025-01-01", + 1735740780.0, "SECRET_CSN_1234", "SECRET_MRN_12345", "SECRET_LOCATION_123", @@ -150,12 +153,81 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): "uV", 5, ) - test_data1 = _make_test_input_csv(tmp_path, file1) - # file2 = _make_test_input_csv(tmp_path) - # file3 = _make_test_input_csv(tmp_path) - - # file1.channel_id, file1.csn, file1.date, file1.units, file1.variable_id - + # same day, same CSN, earlier time + file2 = TestFileDescription( + "2025-01-01", + 1735740765.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 2, + ) + # same day, different CSN + file3 = TestFileDescription( + "2025-01-01", + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + # new day, first CSN again + file4 = TestFileDescription( + "2025-01-02", + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 5, + ) + test_data_files = [] + for f in [file1, file2, file3, file4]: + test_data = _make_test_input_csv(tmp_path, f) + test_data_files.append((f, test_data)) + expected_hash_summaries = { + "2025-01-01": [ + { + "csn": file1.csn, + "hashed_csn": file1.get_hashed_csn(), + "min_timestamp": file2.start_timestamp, + "max_timestamp": ( + file1.start_timestamp + file1.num_rows - 1 + ), # one sec per row + }, + { + "csn": file3.csn, + "hashed_csn": file3.get_hashed_csn(), + "min_timestamp": file3.start_timestamp, + "max_timestamp": ( + file3.start_timestamp + file3.num_rows - 1 + ), # one sec per row + }, + ], + "2025-01-02": [ + { + "csn": file4.csn, + "hashed_csn": file4.get_hashed_csn(), + "min_timestamp": file4.start_timestamp, + "max_timestamp": ( + file4.start_timestamp + file4.num_rows - 1 + ), # one sec per row + } + ], + } + + # ACT compose_args = [ "run", "--rm", @@ -187,24 +259,34 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") result.check_returncode() - expected_hashed_csn = file1.get_hashed_csn() - original_parquet_path = tmp_path / "original-parquet" / file1.get_orig_parquet() - pseudon_path = tmp_path / "pseudonymised" / file1.get_pseudon_parquet() - hash_lookup_path = tmp_path / "hash-lookups" / file1.get_hashes() + # ASSERT + for filename, expected_data in test_data_files: + original_parquet_path = ( + tmp_path / "original-parquet" / filename.get_orig_parquet() + ) + pseudon_path = tmp_path / "pseudonymised" / filename.get_pseudon_parquet() - assert original_parquet_path.exists() - assert pseudon_path.exists() - assert hash_lookup_path.exists() + assert original_parquet_path.exists() + assert pseudon_path.exists() + _compare_original_parquet_to_expected(original_parquet_path, expected_data) + _compare_parquets(expected_data, original_parquet_path, pseudon_path) + + # Check hash summaries: one per day, not per input file # inspect our CSN -> hashed_csn lookup file - hash_lookup = json.loads(hash_lookup_path.read_text()) - assert isinstance(hash_lookup, list) - assert any( - entry.get("csn") == file1.csn and entry.get("hashed_csn") == expected_hashed_csn - for entry in hash_lookup - ) - _compare_original_parquet_to_expected(original_parquet_path, test_data1) - _compare_parquets(test_data1, original_parquet_path, pseudon_path) + for datestr, expected_summary in expected_hash_summaries.items(): + expected_path = tmp_path / "hash-lookups" / f"{datestr}.hashes.json" + actual_hash_lookup_data = json.loads(expected_path.read_text()) + assert isinstance(actual_hash_lookup_data, list) + # sort order to match expected + actual_hash_lookup_data.sort(key=lambda x: x["csn"]) + assert expected_summary == actual_hash_lookup_data + + # check no extraneous files + assert 4 == len(list((tmp_path / "original-csv").iterdir())) + assert 4 == len(list((tmp_path / "original-parquet").iterdir())) + assert 4 == len(list((tmp_path / "pseudonymised").iterdir())) + assert 2 == len(list((tmp_path / "hash-lookups").iterdir())) def _compare_original_parquet_to_expected(original_parquet: Path, expected_test_values): From 9afbfe74a834467ee93084638d2b5e83b999d92a Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 2 Feb 2026 20:44:33 +0000 Subject: [PATCH 14/17] Make test method more readable --- tests/test_snakemake_integration.py | 72 ++++++++++++++++------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index dc771d8..59b56b2 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -137,8 +137,6 @@ def _make_test_input_csv(tmp_path, t: TestFileDescription) -> list[list[Decimal] def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): # ARRANGE - repo_root = Path(__file__).resolve().parents[1] - compose_file = repo_root / "docker-compose.yml" # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later file1 = TestFileDescription( @@ -194,8 +192,9 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): ) test_data_files = [] for f in [file1, file2, file3, file4]: - test_data = _make_test_input_csv(tmp_path, f) - test_data_files.append((f, test_data)) + test_data_values = _make_test_input_csv(tmp_path, f) + test_data_files.append((f, test_data_values)) + expected_hash_summaries = { "2025-01-01": [ { @@ -228,6 +227,42 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): } # ACT + run_snakemake(tmp_path) + + # ASSERT (data files) + for filename, expected_data in test_data_files: + original_parquet_path = ( + tmp_path / "original-parquet" / filename.get_orig_parquet() + ) + pseudon_path = tmp_path / "pseudonymised" / filename.get_pseudon_parquet() + + assert original_parquet_path.exists() + assert pseudon_path.exists() + + _compare_original_parquet_to_expected(original_parquet_path, expected_data) + _compare_parquets(expected_data, original_parquet_path, pseudon_path) + + # ASSERT (hash summaries) + # Hash summaries are one per day, not per input file + for datestr, expected_summary in expected_hash_summaries.items(): + expected_path = tmp_path / "hash-lookups" / f"{datestr}.hashes.json" + actual_hash_lookup_data = json.loads(expected_path.read_text()) + assert isinstance(actual_hash_lookup_data, list) + # sort order to match expected + actual_hash_lookup_data.sort(key=lambda x: x["csn"]) + assert expected_summary == actual_hash_lookup_data + + # check no extraneous files + assert 4 == len(list((tmp_path / "original-csv").iterdir())) + assert 4 == len(list((tmp_path / "original-parquet").iterdir())) + assert 4 == len(list((tmp_path / "pseudonymised").iterdir())) + assert 2 == len(list((tmp_path / "hash-lookups").iterdir())) + + +def run_snakemake(tmp_path): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + compose_args = [ "run", "--rm", @@ -259,35 +294,6 @@ def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") result.check_returncode() - # ASSERT - for filename, expected_data in test_data_files: - original_parquet_path = ( - tmp_path / "original-parquet" / filename.get_orig_parquet() - ) - pseudon_path = tmp_path / "pseudonymised" / filename.get_pseudon_parquet() - - assert original_parquet_path.exists() - assert pseudon_path.exists() - - _compare_original_parquet_to_expected(original_parquet_path, expected_data) - _compare_parquets(expected_data, original_parquet_path, pseudon_path) - - # Check hash summaries: one per day, not per input file - # inspect our CSN -> hashed_csn lookup file - for datestr, expected_summary in expected_hash_summaries.items(): - expected_path = tmp_path / "hash-lookups" / f"{datestr}.hashes.json" - actual_hash_lookup_data = json.loads(expected_path.read_text()) - assert isinstance(actual_hash_lookup_data, list) - # sort order to match expected - actual_hash_lookup_data.sort(key=lambda x: x["csn"]) - assert expected_summary == actual_hash_lookup_data - - # check no extraneous files - assert 4 == len(list((tmp_path / "original-csv").iterdir())) - assert 4 == len(list((tmp_path / "original-parquet").iterdir())) - assert 4 == len(list((tmp_path / "pseudonymised").iterdir())) - assert 2 == len(list((tmp_path / "hash-lookups").iterdir())) - def _compare_original_parquet_to_expected(original_parquet: Path, expected_test_values): # CSV should always match original parquet From 640335a1bba9de6ec5fbea39f2fb74974b54f6af Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 5 Feb 2026 12:18:55 +0000 Subject: [PATCH 15/17] Add warning about using old Docker (Compose) --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 4743482..5f24fc9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,12 @@ A controller for reading waveform data from a rabbitmq queue and processing it. # Running the Code + +## Pre-reqs + +Up-to-date Docker and Docker Compose. We have seen config bugs when using old Docker Compose versions, +such as that packaged with recent Ubuntu LTS. Docker Compose v5.0.1 and Docker 29.1.5 are known to work. + ## 1 Install and deploy EMAP Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set From df786db9a9d9e5d452d6b8e38e9c793159bcc2d4 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 5 Feb 2026 12:20:50 +0000 Subject: [PATCH 16/17] Remove public method as per review suggestion --- src/pipeline/Snakefile | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 15476a0..97a69ac 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -76,21 +76,18 @@ class InputCsvFile: units=self.units ) - def get_subs_dict(self): - return self._subs_dict - def get_original_csv_path(self) -> Path: - return Path(make_file_name(str(CSV_PATTERN), self.get_subs_dict())) + return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) def get_original_parquet_path(self) -> Path: - return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self.get_subs_dict())) + return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) def get_pseudonymised_parquet_path(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self.get_subs_dict()) + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" def get_ftps_uploaded_file(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self.get_subs_dict()) + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") def get_daily_hash_lookup(self) -> Path: From 177a002fb3d9ba6b53d11b25e76f46c935de1f87 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 5 Feb 2026 12:36:14 +0000 Subject: [PATCH 17/17] Clarify comment on our usage of Snakemake --- src/pipeline/Snakefile | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 97a69ac..2775b22 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -231,8 +231,11 @@ def pseudonymised_parquet_files_for_date(wc): rule make_daily_hash_lookup: input: - # we declare the pseud parquets here to make the DAG work, but - # then later we "cheat" and use InputCsvFile objects to get the original CSN + # Because we don't declare the original parquets in the output of csv_to_parquet, + # they are not present in the Snakemake dependency DAG. Therefore, we can't reference them + # here either. + # We lie to Snakemake that the input is the pseudon parquets, but then we use InputCsvFile + # to determine the actual input file (the original parquets). pseudonymised_parquets = pseudonymised_parquet_files_for_date output: hash_lookup_json = WAVEFORM_HASH_LOOKUPS / "{date}.hashes.json"