diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 08943d6..9144ffe 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -60,10 +60,8 @@ jobs: echo "AZURE_KEY_VAULT_NAME=${AZURE_KEY_VAULT_NAME}" echo "AZURE_KEY_VAULT_SECRET_NAME=${AZURE_KEY_VAULT_SECRET_NAME}" } >> ../config/hasher.env - # Don't attempt to upload FTPS from GHA - { - echo "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups" - } >> ../config/exporter.env + + # exporter config is generated from wtthin pytest so no need to do it here - name: Run the tests working-directory: waveform-controller diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 7448419..214801c 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -18,3 +18,7 @@ HASHER_API_PORT=8000 # its output in the FTPS output. # Do not use the name "production" anywhere outside production! INSTANCE_NAME= + +# How long must it have been since an original CSV was last modified before +# progressing it down the pipeline? +CSV_AGE_THRESHOLD_MINUTES=180 diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index e3d3e41..eb494fa 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -27,6 +27,7 @@ set +e snakemake --snakefile /app/src/pipeline/Snakefile \ --cores "$SNAKEMAKE_CORES" \ --until "$SNAKEMAKE_RULE_UNTIL" \ + --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" \ >> "$outer_log_file" 2>&1 ret_code=$? set -e diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 2775b22..0e42b0e 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,4 +1,5 @@ import json +import os import time from datetime import datetime, timedelta, timezone from pathlib import Path @@ -41,7 +42,7 @@ def hash_csn(csn: str) -> str: # protect against very out of order data though) # Also note that snakemake can detect and re-run if the file is subsequently updated, # so being very cautious here may not be necessary. -CSV_WAIT_TIME = timedelta(minutes=5) +CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MINUTES'])) # The problem here is that snakemake works "backwards" in that you have to define which # files you want to be produced, yet we don't know what output files we need until the @@ -116,12 +117,12 @@ def determine_eventual_outputs(): print(f"Skipping file with unmatched CSN: {orig_file}") continue file_age = get_file_age(orig_file) - if file_age < CSV_WAIT_TIME: + if file_age < CSV_AGE_THRESHOLD_MINUTES: print(f"File too new (age={file_age}): {orig_file}") continue _all_outputs.append(input_file_obj) after = time.perf_counter() - print(f"Calculated output files using newness threshold {CSV_WAIT_TIME} in {after - before} seconds") + print(f"Snakemake [pid={os.getpid()}]: Determined {len(_all_outputs)} output files using newness threshold {CSV_AGE_THRESHOLD_MINUTES} in {after - before} seconds") return _all_outputs, _hash_to_csn diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 5fb5f5f..ceaf8e5 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -337,6 +337,7 @@ def _run_snakemake(tmp_path): "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups\n" "SNAKEMAKE_CORES=1\n" "INSTANCE_NAME=pytest\n" + "CSV_AGE_THRESHOLD_MINUTES=5\n" ) # run system under test (exporter container) in foreground compose_args = [