diff --git a/README.md b/README.md index 0b95ef6..2b5427a 100644 --- a/README.md +++ b/README.md @@ -70,11 +70,22 @@ cp waveform-controller/config.EXAMPLE/hasher.env.EXAMPLE config/hasher.env From the new config files, remove the comments telling you not to put secrets in it, as instructed. #### Fill in config files -Fill out the config, as appropriate. +The config files contain documentation in comments, but some are further discussed here. See [azure and hasher setup](docs/azure_hashing.md) to configure the hasher. -When updating to a new version of this code, you should diff the .EXAMPLE file against its live version, +`INSTANCE_NAME` in `exporter.env` should always be set to a non-empty, globally unique value. This is used to +identify the instance of the exporter when the files are uploaded to the FTPS server. +This avoids production data being mixed up with synthetic data. +The `INSTANCE_NAME` isn't used as part of file paths on the system where the instance is running: it's assumed +that only one instance will be using each filesystem location, so this would be unnecessary. It's only used on the +FTPS server. +Example names: +* `yourname-dev` for your dev instance +* `production` for production ONLY + + +When deploying a new version of this code, you should diff the .EXAMPLE file against its live version, eg. by running `vimdiff waveform-controller/config.EXAMPLE/controller.env.EXAMPLE config/controller.env`. This checks if any config options have been added/removed from the .EXAMPLE, and thus should be diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 134ac38..7448419 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -9,6 +9,12 @@ FTPS_USERNAME= FTPS_PASSWORD= # only run workflow up to and including the specified rule SNAKEMAKE_RULE_UNTIL= +# number of cores to dedicate to snakemake (default=1) +SNAKEMAKE_CORES= # point to the hasher we wish to use HASHER_API_HOSTNAME=waveform-hasher HASHER_API_PORT=8000 +# A name for this instance of the software to identify +# its output in the FTPS output. +# Do not use the name "production" anywhere outside production! +INSTANCE_NAME= diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index 02f9e3b..e3d3e41 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -13,7 +13,6 @@ date_str=$(date --utc +"%Y%m%dT%H%M%S") outer_log_file="/waveform-export/snakemake-logs/snakemake-outer-log${date_str}.log" # snakemake has not run yet so will not create the log dir; do it manually mkdir -p "$(dirname "$outer_log_file")" -echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file" touch "$outer_log_file" # bring in envs from file because cron gives us a clean environment set -a @@ -21,6 +20,7 @@ source /config/exporter.env set +a # Now that we have loaded config file, apply default values SNAKEMAKE_CORES="${SNAKEMAKE_CORES:-1}" +echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file" # For telling the pipeline not to go all the way SNAKEMAKE_RULE_UNTIL="${SNAKEMAKE_RULE_UNTIL:-all}" set +e diff --git a/src/exporter/ftps.py b/src/exporter/ftps.py index e7dd141..3f6a040 100644 --- a/src/exporter/ftps.py +++ b/src/exporter/ftps.py @@ -50,7 +50,7 @@ def do_upload(abs_file_to_upload: Path): settings.FTPS_USERNAME, settings.FTPS_PASSWORD, ) - remote_project_dir = "waveform-export" + remote_project_dir = str(Path("waveform-export") / settings.INSTANCE_NAME) _create_and_set_as_cwd(ftp, remote_project_dir) remote_filename = os.path.basename(file_to_upload) command = f"STOR {remote_filename}" diff --git a/src/pseudon/pseudon.py b/src/pseudon/pseudon.py index 8e5e907..3258f00 100644 --- a/src/pseudon/pseudon.py +++ b/src/pseudon/pseudon.py @@ -1,12 +1,15 @@ import argparse import functools +import json import logging from decimal import Decimal from pathlib import Path +from typing import Any import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +import settings from locations import ( WAVEFORM_ORIGINAL_PARQUET, @@ -113,6 +116,11 @@ def parse_array(x): ) table = pa.Table.from_pandas(df, schema=schema, preserve_index=True) + # mark the parquet files themselves as production or not. + our_metadata = {"instance_name": settings.INSTANCE_NAME} + + table = add_metadata_to_table(table, our_metadata) + original_parquet_path = WAVEFORM_ORIGINAL_PARQUET / (csv_path.stem + ".parquet") pq.write_table( table, @@ -130,6 +138,9 @@ def parse_array(x): df = pseudonymise_relevant_columns(df) pseudon_table = pa.Table.from_pandas(df, schema=schema, preserve_index=True) + # Use same metadata for pseudon, must not contain identifiers! + pseudon_table = add_metadata_to_table(pseudon_table, our_metadata) + hashed_path = Path( str(PSEUDONYMISED_PARQUET_PATTERN).format( date=date_str, @@ -152,6 +163,21 @@ def parse_array(x): ) +# The convention seems to be that you put all your data as JSON under a single key +WAVEFORM_EXPORTER_METADATA_KEY = b"waveform_exporter" + + +def add_metadata_to_table( + pseudon_table: pa.Table, our_metadata: dict[str, Any] +) -> pa.Table: + existing_metadata = pseudon_table.schema.metadata or {} + json_byte_string = json.dumps(our_metadata).encode("utf-8") + pseudon_table = pseudon_table.replace_schema_metadata( + {**existing_metadata, WAVEFORM_EXPORTER_METADATA_KEY: json_byte_string} + ) + return pseudon_table + + SAFE_COLUMNS = [ "sampling_rate", "source_variable_id", diff --git a/src/settings.py b/src/settings.py index 8aaed85..000e186 100644 --- a/src/settings.py +++ b/src/settings.py @@ -32,3 +32,5 @@ def get_from_env(env_var, *, default_value=None, setting_name=None): get_from_env("HASHER_API_HOSTNAME") get_from_env("HASHER_API_PORT") + +get_from_env("INSTANCE_NAME", default_value="UNKNOWN-INSTANCE") diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 74b3e58..5fb5f5f 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -15,6 +15,8 @@ import pytest from stablehash import stablehash +from pseudon.pseudon import WAVEFORM_EXPORTER_METADATA_KEY + def _run_compose( compose_file: Path, args: list[str], cwd: Path @@ -301,6 +303,10 @@ def test_snakemake_pipeline(tmp_path: Path, background_hasher): _compare_original_parquet_to_expected(original_parquet_path, expected_data) _compare_parquets(expected_data, original_parquet_path, pseudon_path) + # check metadata showing the instance name is in both parquet files + expected_data = {"instance_name": "pytest"} + _assert_parquet_footer_metadata(pseudon_path, expected_data) + _assert_parquet_footer_metadata(original_parquet_path, expected_data) # ASSERT (hash summaries) # Hash summaries are one per day, not per input file @@ -320,6 +326,18 @@ def test_snakemake_pipeline(tmp_path: Path, background_hasher): def _run_snakemake(tmp_path): + # Config is a right pain. The exporter has a blank environment because it's launched by cron, so + # nothing passed in as an env var will be seen. + # It works around this in normal use by reading env vars only from the bind-mounted exporter.env file. + # So to use a different config during test, we must override that file with a special version + # that we create here. + tmp_exporter_env_path = tmp_path / "config/exporter.env" + tmp_exporter_env_path.parent.mkdir(exist_ok=True) + tmp_exporter_env_path.write_text( + "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups\n" + "SNAKEMAKE_CORES=1\n" + "INSTANCE_NAME=pytest\n" + ) # run system under test (exporter container) in foreground compose_args = [ "run", @@ -327,12 +345,11 @@ def _run_snakemake(tmp_path): # we override the volume defined in the compose file to be the pytest tmp path "-v", f"{tmp_path}:/waveform-export", + # feed in our special config file + "-v", + f"{tmp_exporter_env_path}:/config/exporter.env:ro", "--entrypoint", "/app/exporter-scripts/scheduled-script.sh", - "-e", - "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups", - "-e", - "SNAKEMAKE_CORES=1", "waveform-exporter", ] result = _run_compose( @@ -393,3 +410,21 @@ def _compare_parquets( assert all( "SECRET" in str(v) for v in orig_all_values ), f"{orig_all_values} in column {column_name} contains SECRET string" + + +def _assert_parquet_footer_metadata( + parquet_path: Path, expected_values: dict[str, str] +): + parquet_file = pq.ParquetFile(parquet_path) + footer_metadata: dict[bytes, bytes] = parquet_file.metadata.metadata + actual_metadata_dict = json.loads( + footer_metadata[WAVEFORM_EXPORTER_METADATA_KEY].decode("utf-8") + ) + for expected_key, expected_val in expected_values.items(): + assert ( + expected_val == actual_metadata_dict[expected_key] + ), f"{parquet_path} value mismatch" + # no metadata items contain identifiers + for actual_key, actual_val in actual_metadata_dict.items(): + assert "SECRET" not in actual_key + assert "SECRET" not in actual_val