Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ jobs:
echo "AZURE_KEY_VAULT_NAME=${AZURE_KEY_VAULT_NAME}"
echo "AZURE_KEY_VAULT_SECRET_NAME=${AZURE_KEY_VAULT_SECRET_NAME}"
} >> ../config/hasher.env
# Don't attempt to upload FTPS from GHA
{
echo "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups"
} >> ../config/exporter.env

# exporter config is generated from wtthin pytest so no need to do it here

- name: Run the tests
working-directory: waveform-controller
Expand Down
4 changes: 4 additions & 0 deletions config.EXAMPLE/exporter.env.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ HASHER_API_PORT=8000
# its output in the FTPS output.
# Do not use the name "production" anywhere outside production!
INSTANCE_NAME=

# How long must it have been since an original CSV was last modified before
# progressing it down the pipeline?
CSV_AGE_THRESHOLD_MINUTES=180
1 change: 1 addition & 0 deletions exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ set +e
snakemake --snakefile /app/src/pipeline/Snakefile \
--cores "$SNAKEMAKE_CORES" \
--until "$SNAKEMAKE_RULE_UNTIL" \
--config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" \
>> "$outer_log_file" 2>&1
ret_code=$?
set -e
Expand Down
7 changes: 4 additions & 3 deletions src/pipeline/Snakefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
Expand Down Expand Up @@ -41,7 +42,7 @@ def hash_csn(csn: str) -> str:
# protect against very out of order data though)
# Also note that snakemake can detect and re-run if the file is subsequently updated,
# so being very cautious here may not be necessary.
CSV_WAIT_TIME = timedelta(minutes=5)
CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MINUTES']))

# The problem here is that snakemake works "backwards" in that you have to define which
# files you want to be produced, yet we don't know what output files we need until the
Expand Down Expand Up @@ -116,12 +117,12 @@ def determine_eventual_outputs():
print(f"Skipping file with unmatched CSN: {orig_file}")
continue
file_age = get_file_age(orig_file)
if file_age < CSV_WAIT_TIME:
if file_age < CSV_AGE_THRESHOLD_MINUTES:
print(f"File too new (age={file_age}): {orig_file}")
continue
_all_outputs.append(input_file_obj)
after = time.perf_counter()
print(f"Calculated output files using newness threshold {CSV_WAIT_TIME} in {after - before} seconds")
print(f"Snakemake [pid={os.getpid()}]: Determined {len(_all_outputs)} output files using newness threshold {CSV_AGE_THRESHOLD_MINUTES} in {after - before} seconds")
return _all_outputs, _hash_to_csn


Expand Down
1 change: 1 addition & 0 deletions tests/test_snakemake_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def _run_snakemake(tmp_path):
"SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups\n"
"SNAKEMAKE_CORES=1\n"
"INSTANCE_NAME=pytest\n"
"CSV_AGE_THRESHOLD_MINUTES=5\n"
)
# run system under test (exporter container) in foreground
compose_args = [
Expand Down