Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .github/workflows/calibration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@ jobs:
- name: Build Lambda Docker Image
run: |
cd lambda_function
docker build -t processing_function:latest .
export BASE_IMAGE=public.ecr.aws/w5r9l1c8/dev-padre-swsoc-docker-lambda-base:latest
export REQUIREMENTS_FILE=padre-requirements.txt
docker build \
--build-arg BASE_IMAGE=$BASE_IMAGE \
--build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \
-t processing_function:latest .

- name: Run Lambda Docker Container
run: |
docker run -d --name processing_lambda -p 9000:8080 -e USE_INSTRUMENT_TEST_DATA=True -e SWXSOC_MISSION=hermes processing_function:latest
docker run -d \
--name processing_lambda \
-p 9000:8080 \
-v "${{ github.workspace }}/lambda_function/tests/test_data/:/test_data" \
-e SWXSOC_MISSION=padre \
-e SDC_AWS_FILE_PATH=/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv \
processing_function:latest
container_id=$(docker ps -qf "ancestor=processing_function:latest")
echo "Container ID: $container_id"

Expand All @@ -30,7 +41,7 @@ jobs:
id: test-lambda
run: |
# Run curl and write the HTTP status code to a variable
HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json)
HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_craft_event.json)
echo "HTTP Status: $HTTP_STATUS"

# Grep the HTTP status code from the curl output for 200 (success)
Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,30 @@ The container will contain the latest release code as the production environment
### **Testing Locally (Using own Test Data)**:
1. Build the lambda container image (from within the lambda_function folder) you'd like to test:

`docker build -t processing_function:latest . --no-cache`
```sh
docker build \
--build-arg BASE_IMAGE=$BASE_IMAGE \ # Optional: specify base image
--build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \ # Optional: specify requirements file
-t sdc_aws_processing_lambda:latest . \
--network host
```

2. Run the lambda container image you've built, this will start the lambda runtime environment:

`docker run -p 9000:8080 -v <directory_for_processed_files>:/test_data -e SDC_AWS_FILE_PATH=/test_data/<file_to_process_name> processing_function:latest`
```sh
docker run \
-p 9000:8080 \
-v <directory_for_processed_files>:/test_data \
-e SDC_AWS_FILE_PATH=/test_data/<file_to_process_name> \
sdc_aws_processing_lambda:latest
```

3. From a `separate` terminal, make a curl request to the running lambda function:

`curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json`
```sh
curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" \
-d @lambda_function/tests/test_data/test_eea_event.json
```

4. Close original terminal running the docker image.

Expand Down Expand Up @@ -50,4 +65,4 @@ The container will contain the latest release code as the production environment


### **How this Lambda Function is deployed**
This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/HERMES-SOC/sdc_aws_pipeline_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml).
This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/swxsoc/sdc_aws_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml).
1 change: 1 addition & 0 deletions lambda_function/padre-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ sdc_aws_utils @ git+https://github.com/swxsoc/sdc_aws_utils.git
metatracker @ git+https://github.com/swxsoc/MetaTracker.git
padre_meddea @ git+https://github.com/PADRESat/padre_meddea.git
padre_sharp @ git+https://github.com/PADRESat/padre_sharp.git
padre_craft @ git+https://github.com/PADRESat/padre_craft.git
tenacity==9.1.2
228 changes: 135 additions & 93 deletions lambda_function/src/file_processor/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from itertools import combinations
import shutil
import traceback
from typing import Tuple

import swxsoc

Expand Down Expand Up @@ -164,10 +165,10 @@ def _process_file(self) -> None:
)

FileProcessor._track_file_metatracker(
science_filename_parser,
Path(file_path),
self.file_key,
self.instrument_bucket_name,
science_filename_parser=science_filename_parser,
file_path=Path(file_path),
s3_key=self.file_key,
s3_bucket=self.instrument_bucket_name,
status=status,
)

Expand All @@ -182,15 +183,16 @@ def _process_file(self) -> None:
# If calibrated files are found, set status to success
status = self.build_status(
status=Status.SUCCESS,
message=f"File Processed Successfully",
message="File Processed Successfully",
total_time=total_time,
)

# Track the original science file as processed successfully
science_file_id, science_product_id = FileProcessor._track_file_metatracker(
science_filename_parser,
Path(file_path),
self.file_key,
self.instrument_bucket_name,
science_filename_parser=science_filename_parser,
file_path=Path(file_path),
s3_key=self.file_key,
s3_bucket=self.instrument_bucket_name,
status=status,
)

Expand All @@ -203,6 +205,10 @@ def _process_file(self) -> None:
}
)

# Filter out None values from calibrated filenames
calibrated_filenames = [
fname for fname in calibrated_filenames if fname is not None
]
# Push file to S3 Bucket
for calibrated_filename in calibrated_filenames:
status = self.build_status(
Expand All @@ -220,11 +226,11 @@ def _process_file(self) -> None:

# Track the calibrated file in the CDF Tracker
self._track_file_metatracker(
science_filename_parser,
Path("/tmp") / calibrated_filename,
calibrated_filename,
destination_bucket,
science_product_id,
science_filename_parser=science_filename_parser,
file_path=Path("/tmp") / calibrated_filename,
s3_key=calibrated_filename,
s3_bucket=destination_bucket,
science_product_id=science_product_id,
status=status,
)

Expand Down Expand Up @@ -260,43 +266,61 @@ def _calibrate_file(instrument, file_path, dry_run=False):
fromlist=["data"],
)
# Get all files in test data directory
test_data_dir = Path(instr_pkg_data.__path__[0])
test_data_dir = Path(instr_pkg_data.__path__[0]) / "test"
log.info(f"Test data directory: {test_data_dir}")
test_data_files = list(test_data_dir.glob("**/*"))
log.info(f"Found {len(test_data_files)} files in test data directory")
log.info(f"Using {test_data_files} as test data")
# Get any files ending in .bin or .cdf and calibrate them
# Stub path list for calibrated files
path_list = []
# Loop the test data files for calibration
for test_data_file in test_data_files:
if test_data_file.suffix in [".bin", ".cdf", ".fits"]:
log.info(f"Calibrating {test_data_file}")
if test_data_file.suffix in [".bin", ".cdf", ".fits", ".csv"]:
log.info(f"Calibrating {test_data_file.name}")
# Make /test_data directory if it doesn't exist
Path("/test_data").mkdir(parents=True, exist_ok=True)
# Copy file to /test_data directory using shutil
test_data_file_path = Path(test_data_file)
file_path = Path(f"/test_data/{test_data_file_path.name}")
shutil.copy(test_data_file_path, file_path)
# Calibrate file
calibrated_filename = calibration.process_file(file_path)[0]
# Copy calibrated file to test data directory
calibrated_file_path = Path(calibrated_filename)
# Return name of calibrated file
log.info(f"Calibrated file saved as {calibrated_file_path}")

return calibrated_filename
files_list = calibration.process_file(file_path)

if len(files_list) == 0:
log.warning(
f"No calibrated files generated for {file_path}"
)
continue
for generated_file in files_list:
if generated_file is not None:
new_file_path = Path(generated_file)
calibrated_filename = new_file_path.name
path_list.append(calibrated_filename)
log.info(
f"Calibrated file saved as {calibrated_filename}"
)
else:
# Pass-through None values to indicate no file was created
path_list.append(None)
log.warning(f"'None' file generated for {file_path}")
# Return list of calibrated files
return path_list

# If no files ending in .bin or .cdf are found, raise an error
raise FileNotFoundError(
"No files ending in .bin or .cdf found in test data directory"
)
log.info(f"Calibrating {file_path}")
# Get name of new file
files_list = calibration.process_file(Path(file_path))

path_list = []
for generated_file in files_list:
new_file_path = Path(generated_file)
calibrated_filename = new_file_path.name
path_list.append(calibrated_filename)
log.info(f"Calibrated file saved as {calibrated_filename}")
if generated_file is not None:
new_file_path = Path(generated_file)
calibrated_filename = new_file_path.name
path_list.append(calibrated_filename)
log.info(f"Calibrated file saved as {calibrated_filename}")
else:
# Pass-through None values to indicate no file was created
path_list.append(None)
log.warning(f"'None' file generated for {file_path}")

return path_list

Expand Down Expand Up @@ -336,84 +360,102 @@ def _calibrate_file(instrument, file_path, dry_run=False):
reraise=True,
)
def _track_file_metatracker(
science_filename_parser,
file_path,
s3_key,
s3_bucket,
science_product_id=None,
status=None,
) -> int:
science_filename_parser: callable,
file_path: Path,
s3_key: str,
s3_bucket: str,
science_product_id: int = None,
status: dict = None,
) -> Tuple[int, int]:
"""
Tracks processed science product in the CDF Tracker file database.
Tracks processed science product in the File Metadata tracker database.
It involves initializing the database engine, setting up database tables,
and tracking both the original and processed files.

:param science_filename_parser: The parser function to process file names.
:type science_filename_parser: function
:param file_path: The path of the original file.
:type file_path: Path
Parameters
----------
science_filename_parser : function
The parser function to process file names.
file_path : Path
The path of the file in the filesystem.
s3_key : str
The S3 key of the file.
s3_bucket : str
The S3 bucket of the file.
science_product_id : int, optional
The ID of the science product, by default None.
status : dict, optional
The status dictionary for tracking, by default None.

Returns
-------
Tuple[int, int]
A tuple containing the science file ID and science product ID.
"""
secret_arn = os.getenv("RDS_SECRET_ARN", None)
if secret_arn:
try:
# Validate file path
if not file_path or not isinstance(file_path, Path):
raise ValueError("Invalid file path provided.")
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")

# Get Database Credentials
session = boto3.session.Session()
client = session.client(service_name="secretsmanager")
response = client.get_secret_value(SecretId=secret_arn)
secret = json.loads(response["SecretString"])
connection_string = (
f"postgresql://{secret['username']}:{secret['password']}@"
f"{secret['host']}:{secret['port']}/{secret['dbname']}"
)
if not secret_arn:
log.error(
f"Failed to update MetaTracker for file {file_path}. No RDS Secret ARN found in environment variables.",
)
return None, None

metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config)
try:
# Validate file path
if not file_path or not isinstance(file_path, Path):
raise ValueError("Invalid file path provided.")
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")

# Get Database Credentials
session = boto3.session.Session()
client = session.client(service_name="secretsmanager")
response = client.get_secret_value(SecretId=secret_arn)
secret = json.loads(response["SecretString"])
connection_string = (
f"postgresql://{secret['username']}:{secret['password']}@"
f"{secret['host']}:{secret['port']}/{secret['dbname']}"
)

log.debug(swxsoc.config)
metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config)

log.debug(metatracker_config)
log.debug(swxsoc.config)

metatracker.set_config(metatracker_config)
log.debug(metatracker_config)

from metatracker.database import create_engine
from metatracker.database.tables import create_tables
from metatracker.tracker import tracker
metatracker.set_config(metatracker_config)

# Initialize the database engine
database_engine = create_engine(connection_string)
from metatracker.database import create_engine
from metatracker.database.tables import create_tables
from metatracker.tracker import tracker

# Create tables if they do not exist
create_tables(database_engine)
# Initialize the database engine
database_engine = create_engine(connection_string)

# Set tracker to MetaTracker
meta_tracker = tracker.MetaTracker(
database_engine, science_filename_parser
)
# Create tables if they do not exist
create_tables(database_engine)

if meta_tracker:
science_file_id, science_product_id = meta_tracker.track(
file_path, s3_key, s3_bucket, status=status
)
# Set tracker to MetaTracker
meta_tracker = tracker.MetaTracker(database_engine, science_filename_parser)

return science_file_id, science_product_id
if meta_tracker:
science_file_id, science_product_id = meta_tracker.track(
file_path, s3_key, s3_bucket, status=status
)

return None
return science_file_id, science_product_id

except Exception as e:
log.error(
{
"status": "ERROR",
"message": str(e),
"traceback": traceback.format_exc(),
}
)
return None
return None, None

except Exception as e:
log.error(
{
"status": "ERROR",
"message": str(e),
"traceback": traceback.format_exc(),
}
)
return None, None

@staticmethod
def get_metatracker_config(swxsoc_config: dict) -> dict:
Expand Down
Loading