Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,22 @@ cp waveform-controller/config.EXAMPLE/hasher.env.EXAMPLE config/hasher.env
From the new config files, remove the comments telling you not to put secrets in it, as instructed.

#### Fill in config files
Fill out the config, as appropriate.
The config files contain documentation in comments, but some are further discussed here.

See [azure and hasher setup](docs/azure_hashing.md) to configure the hasher.

When updating to a new version of this code, you should diff the .EXAMPLE file against its live version,
`INSTANCE_NAME` in `exporter.env` should always be set to a non-empty, globally unique value. This is used to
identify the instance of the exporter when the files are uploaded to the FTPS server.
This avoids production data being mixed up with synthetic data.
The `INSTANCE_NAME` isn't used as part of file paths on the system where the instance is running: it's assumed
that only one instance will be using each filesystem location, so this would be unnecessary. It's only used on the
FTPS server.
Example names:
* `yourname-dev` for your dev instance
* `production` for production ONLY


When deploying a new version of this code, you should diff the .EXAMPLE file against its live version,
eg. by running `vimdiff waveform-controller/config.EXAMPLE/controller.env.EXAMPLE config/controller.env`.

This checks if any config options have been added/removed from the .EXAMPLE, and thus should be
Expand Down
6 changes: 6 additions & 0 deletions config.EXAMPLE/exporter.env.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ FTPS_USERNAME=
FTPS_PASSWORD=
# only run workflow up to and including the specified rule
SNAKEMAKE_RULE_UNTIL=
# number of cores to dedicate to snakemake (default=1)
SNAKEMAKE_CORES=
# point to the hasher we wish to use
HASHER_API_HOSTNAME=waveform-hasher
HASHER_API_PORT=8000
# A name for this instance of the software to identify
# its output in the FTPS output.
# Do not use the name "production" anywhere outside production!
INSTANCE_NAME=
2 changes: 1 addition & 1 deletion exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ date_str=$(date --utc +"%Y%m%dT%H%M%S")
outer_log_file="/waveform-export/snakemake-logs/snakemake-outer-log${date_str}.log"
# snakemake has not run yet so will not create the log dir; do it manually
mkdir -p "$(dirname "$outer_log_file")"
echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file"
touch "$outer_log_file"
# bring in envs from file because cron gives us a clean environment
set -a
source /config/exporter.env
set +a
# Now that we have loaded config file, apply default values
SNAKEMAKE_CORES="${SNAKEMAKE_CORES:-1}"
echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file"
# For telling the pipeline not to go all the way
SNAKEMAKE_RULE_UNTIL="${SNAKEMAKE_RULE_UNTIL:-all}"
set +e
Expand Down
2 changes: 1 addition & 1 deletion src/exporter/ftps.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def do_upload(abs_file_to_upload: Path):
settings.FTPS_USERNAME,
settings.FTPS_PASSWORD,
)
remote_project_dir = "waveform-export"
remote_project_dir = str(Path("waveform-export") / settings.INSTANCE_NAME)
_create_and_set_as_cwd(ftp, remote_project_dir)
remote_filename = os.path.basename(file_to_upload)
command = f"STOR {remote_filename}"
Expand Down
26 changes: 26 additions & 0 deletions src/pseudon/pseudon.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import argparse
import functools
import json
import logging
from decimal import Decimal
from pathlib import Path
from typing import Any

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import settings

from locations import (
WAVEFORM_ORIGINAL_PARQUET,
Expand Down Expand Up @@ -113,6 +116,11 @@ def parse_array(x):
)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=True)

# mark the parquet files themselves as production or not.
our_metadata = {"instance_name": settings.INSTANCE_NAME}

table = add_metadata_to_table(table, our_metadata)

original_parquet_path = WAVEFORM_ORIGINAL_PARQUET / (csv_path.stem + ".parquet")
pq.write_table(
table,
Expand All @@ -130,6 +138,9 @@ def parse_array(x):
df = pseudonymise_relevant_columns(df)
pseudon_table = pa.Table.from_pandas(df, schema=schema, preserve_index=True)

# Use same metadata for pseudon, must not contain identifiers!
pseudon_table = add_metadata_to_table(pseudon_table, our_metadata)

hashed_path = Path(
str(PSEUDONYMISED_PARQUET_PATTERN).format(
date=date_str,
Expand All @@ -152,6 +163,21 @@ def parse_array(x):
)


# The convention seems to be that you put all your data as JSON under a single key
WAVEFORM_EXPORTER_METADATA_KEY = b"waveform_exporter"


def add_metadata_to_table(
pseudon_table: pa.Table, our_metadata: dict[str, Any]
) -> pa.Table:
existing_metadata = pseudon_table.schema.metadata or {}
json_byte_string = json.dumps(our_metadata).encode("utf-8")
pseudon_table = pseudon_table.replace_schema_metadata(
{**existing_metadata, WAVEFORM_EXPORTER_METADATA_KEY: json_byte_string}
)
return pseudon_table


SAFE_COLUMNS = [
"sampling_rate",
"source_variable_id",
Expand Down
2 changes: 2 additions & 0 deletions src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ def get_from_env(env_var, *, default_value=None, setting_name=None):

get_from_env("HASHER_API_HOSTNAME")
get_from_env("HASHER_API_PORT")

get_from_env("INSTANCE_NAME", default_value="UNKNOWN-INSTANCE")
43 changes: 39 additions & 4 deletions tests/test_snakemake_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import pytest
from stablehash import stablehash

from pseudon.pseudon import WAVEFORM_EXPORTER_METADATA_KEY


def _run_compose(
compose_file: Path, args: list[str], cwd: Path
Expand Down Expand Up @@ -301,6 +303,10 @@ def test_snakemake_pipeline(tmp_path: Path, background_hasher):

_compare_original_parquet_to_expected(original_parquet_path, expected_data)
_compare_parquets(expected_data, original_parquet_path, pseudon_path)
# check metadata showing the instance name is in both parquet files
expected_data = {"instance_name": "pytest"}
_assert_parquet_footer_metadata(pseudon_path, expected_data)
_assert_parquet_footer_metadata(original_parquet_path, expected_data)

# ASSERT (hash summaries)
# Hash summaries are one per day, not per input file
Expand All @@ -320,19 +326,30 @@ def test_snakemake_pipeline(tmp_path: Path, background_hasher):


def _run_snakemake(tmp_path):
# Config is a right pain. The exporter has a blank environment because it's launched by cron, so
# nothing passed in as an env var will be seen.
# It works around this in normal use by reading env vars only from the bind-mounted exporter.env file.
# So to use a different config during test, we must override that file with a special version
# that we create here.
tmp_exporter_env_path = tmp_path / "config/exporter.env"
tmp_exporter_env_path.parent.mkdir(exist_ok=True)
tmp_exporter_env_path.write_text(
"SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups\n"
"SNAKEMAKE_CORES=1\n"
"INSTANCE_NAME=pytest\n"
)
# run system under test (exporter container) in foreground
compose_args = [
"run",
"--rm",
# we override the volume defined in the compose file to be the pytest tmp path
"-v",
f"{tmp_path}:/waveform-export",
# feed in our special config file
"-v",
f"{tmp_exporter_env_path}:/config/exporter.env:ro",
"--entrypoint",
"/app/exporter-scripts/scheduled-script.sh",
"-e",
"SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups",
"-e",
"SNAKEMAKE_CORES=1",
"waveform-exporter",
]
result = _run_compose(
Expand Down Expand Up @@ -393,3 +410,21 @@ def _compare_parquets(
assert all(
"SECRET" in str(v) for v in orig_all_values
), f"{orig_all_values} in column {column_name} contains SECRET string"


def _assert_parquet_footer_metadata(
parquet_path: Path, expected_values: dict[str, str]
):
parquet_file = pq.ParquetFile(parquet_path)
footer_metadata: dict[bytes, bytes] = parquet_file.metadata.metadata
actual_metadata_dict = json.loads(
footer_metadata[WAVEFORM_EXPORTER_METADATA_KEY].decode("utf-8")
)
for expected_key, expected_val in expected_values.items():
assert (
expected_val == actual_metadata_dict[expected_key]
), f"{parquet_path} value mismatch"
# no metadata items contain identifiers
for actual_key, actual_val in actual_metadata_dict.items():
assert "SECRET" not in actual_key
assert "SECRET" not in actual_val