diff --git a/.github/workflows/calibration.yml b/.github/workflows/calibration.yml index ff3f24f..1ff12d1 100644 --- a/.github/workflows/calibration.yml +++ b/.github/workflows/calibration.yml @@ -15,11 +15,22 @@ jobs: - name: Build Lambda Docker Image run: | cd lambda_function - docker build -t processing_function:latest . + export BASE_IMAGE=public.ecr.aws/w5r9l1c8/dev-padre-swsoc-docker-lambda-base:latest + export REQUIREMENTS_FILE=padre-requirements.txt + docker build \ + --build-arg BASE_IMAGE=$BASE_IMAGE \ + --build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \ + -t processing_function:latest . - name: Run Lambda Docker Container run: | - docker run -d --name processing_lambda -p 9000:8080 -e USE_INSTRUMENT_TEST_DATA=True -e SWXSOC_MISSION=hermes processing_function:latest + docker run -d \ + --name processing_lambda \ + -p 9000:8080 \ + -v "${{ github.workspace }}/lambda_function/tests/test_data/:/test_data" \ + -e SWXSOC_MISSION=padre \ + -e SDC_AWS_FILE_PATH=/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv \ + processing_function:latest container_id=$(docker ps -qf "ancestor=processing_function:latest") echo "Container ID: $container_id" @@ -30,7 +41,7 @@ jobs: id: test-lambda run: | # Run curl and write the HTTP status code to a variable - HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json) + HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_craft_event.json) echo "HTTP Status: $HTTP_STATUS" # Grep the HTTP status code from the curl output for 200 (success) diff --git a/README.md b/README.md index b1dcc11..bd0c7b6 100755 --- a/README.md +++ b/README.md @@ -13,15 +13,30 @@ The container will contain the latest release code as the production environment ### **Testing Locally (Using own Test Data)**: 1. Build the lambda container image (from within the lambda_function folder) you'd like to test: - `docker build -t processing_function:latest . --no-cache` +```sh +docker build \ + --build-arg BASE_IMAGE=$BASE_IMAGE \ # Optional: specify base image + --build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \ # Optional: specify requirements file + -t sdc_aws_processing_lambda:latest . \ + --network host +``` 2. Run the lambda container image you've built, this will start the lambda runtime environment: - `docker run -p 9000:8080 -v :/test_data -e SDC_AWS_FILE_PATH=/test_data/ processing_function:latest` +```sh +docker run \ + -p 9000:8080 \ + -v :/test_data \ + -e SDC_AWS_FILE_PATH=/test_data/ \ + sdc_aws_processing_lambda:latest +``` 3. From a `separate` terminal, make a curl request to the running lambda function: - `curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json` +```sh +curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" \ + -d @lambda_function/tests/test_data/test_eea_event.json +``` 4. Close original terminal running the docker image. @@ -50,4 +65,4 @@ The container will contain the latest release code as the production environment ### **How this Lambda Function is deployed** -This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/HERMES-SOC/sdc_aws_pipeline_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml). \ No newline at end of file +This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/swxsoc/sdc_aws_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml). \ No newline at end of file diff --git a/lambda_function/padre-requirements.txt b/lambda_function/padre-requirements.txt index 8f55b4b..af4f80f 100755 --- a/lambda_function/padre-requirements.txt +++ b/lambda_function/padre-requirements.txt @@ -2,4 +2,5 @@ sdc_aws_utils @ git+https://github.com/swxsoc/sdc_aws_utils.git metatracker @ git+https://github.com/swxsoc/MetaTracker.git padre_meddea @ git+https://github.com/PADRESat/padre_meddea.git padre_sharp @ git+https://github.com/PADRESat/padre_sharp.git +padre_craft @ git+https://github.com/PADRESat/padre_craft.git tenacity==9.1.2 \ No newline at end of file diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 054e18e..0b295e0 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -12,6 +12,7 @@ from itertools import combinations import shutil import traceback +from typing import Tuple import swxsoc @@ -164,10 +165,10 @@ def _process_file(self) -> None: ) FileProcessor._track_file_metatracker( - science_filename_parser, - Path(file_path), - self.file_key, - self.instrument_bucket_name, + science_filename_parser=science_filename_parser, + file_path=Path(file_path), + s3_key=self.file_key, + s3_bucket=self.instrument_bucket_name, status=status, ) @@ -182,15 +183,16 @@ def _process_file(self) -> None: # If calibrated files are found, set status to success status = self.build_status( status=Status.SUCCESS, - message=f"File Processed Successfully", + message="File Processed Successfully", total_time=total_time, ) + # Track the original science file as processed successfully science_file_id, science_product_id = FileProcessor._track_file_metatracker( - science_filename_parser, - Path(file_path), - self.file_key, - self.instrument_bucket_name, + science_filename_parser=science_filename_parser, + file_path=Path(file_path), + s3_key=self.file_key, + s3_bucket=self.instrument_bucket_name, status=status, ) @@ -203,6 +205,10 @@ def _process_file(self) -> None: } ) + # Filter out None values from calibrated filenames + calibrated_filenames = [ + fname for fname in calibrated_filenames if fname is not None + ] # Push file to S3 Bucket for calibrated_filename in calibrated_filenames: status = self.build_status( @@ -220,11 +226,11 @@ def _process_file(self) -> None: # Track the calibrated file in the CDF Tracker self._track_file_metatracker( - science_filename_parser, - Path("/tmp") / calibrated_filename, - calibrated_filename, - destination_bucket, - science_product_id, + science_filename_parser=science_filename_parser, + file_path=Path("/tmp") / calibrated_filename, + s3_key=calibrated_filename, + s3_bucket=destination_bucket, + science_product_id=science_product_id, status=status, ) @@ -260,14 +266,17 @@ def _calibrate_file(instrument, file_path, dry_run=False): fromlist=["data"], ) # Get all files in test data directory - test_data_dir = Path(instr_pkg_data.__path__[0]) + test_data_dir = Path(instr_pkg_data.__path__[0]) / "test" + log.info(f"Test data directory: {test_data_dir}") test_data_files = list(test_data_dir.glob("**/*")) log.info(f"Found {len(test_data_files)} files in test data directory") log.info(f"Using {test_data_files} as test data") - # Get any files ending in .bin or .cdf and calibrate them + # Stub path list for calibrated files + path_list = [] + # Loop the test data files for calibration for test_data_file in test_data_files: - if test_data_file.suffix in [".bin", ".cdf", ".fits"]: - log.info(f"Calibrating {test_data_file}") + if test_data_file.suffix in [".bin", ".cdf", ".fits", ".csv"]: + log.info(f"Calibrating {test_data_file.name}") # Make /test_data directory if it doesn't exist Path("/test_data").mkdir(parents=True, exist_ok=True) # Copy file to /test_data directory using shutil @@ -275,28 +284,43 @@ def _calibrate_file(instrument, file_path, dry_run=False): file_path = Path(f"/test_data/{test_data_file_path.name}") shutil.copy(test_data_file_path, file_path) # Calibrate file - calibrated_filename = calibration.process_file(file_path)[0] - # Copy calibrated file to test data directory - calibrated_file_path = Path(calibrated_filename) - # Return name of calibrated file - log.info(f"Calibrated file saved as {calibrated_file_path}") - - return calibrated_filename + files_list = calibration.process_file(file_path) + + if len(files_list) == 0: + log.warning( + f"No calibrated files generated for {file_path}" + ) + continue + for generated_file in files_list: + if generated_file is not None: + new_file_path = Path(generated_file) + calibrated_filename = new_file_path.name + path_list.append(calibrated_filename) + log.info( + f"Calibrated file saved as {calibrated_filename}" + ) + else: + # Pass-through None values to indicate no file was created + path_list.append(None) + log.warning(f"'None' file generated for {file_path}") + # Return list of calibrated files + return path_list - # If no files ending in .bin or .cdf are found, raise an error - raise FileNotFoundError( - "No files ending in .bin or .cdf found in test data directory" - ) log.info(f"Calibrating {file_path}") # Get name of new file files_list = calibration.process_file(Path(file_path)) path_list = [] for generated_file in files_list: - new_file_path = Path(generated_file) - calibrated_filename = new_file_path.name - path_list.append(calibrated_filename) - log.info(f"Calibrated file saved as {calibrated_filename}") + if generated_file is not None: + new_file_path = Path(generated_file) + calibrated_filename = new_file_path.name + path_list.append(calibrated_filename) + log.info(f"Calibrated file saved as {calibrated_filename}") + else: + # Pass-through None values to indicate no file was created + path_list.append(None) + log.warning(f"'None' file generated for {file_path}") return path_list @@ -336,84 +360,102 @@ def _calibrate_file(instrument, file_path, dry_run=False): reraise=True, ) def _track_file_metatracker( - science_filename_parser, - file_path, - s3_key, - s3_bucket, - science_product_id=None, - status=None, - ) -> int: + science_filename_parser: callable, + file_path: Path, + s3_key: str, + s3_bucket: str, + science_product_id: int = None, + status: dict = None, + ) -> Tuple[int, int]: """ - Tracks processed science product in the CDF Tracker file database. + Tracks processed science product in the File Metadata tracker database. It involves initializing the database engine, setting up database tables, and tracking both the original and processed files. - :param science_filename_parser: The parser function to process file names. - :type science_filename_parser: function - :param file_path: The path of the original file. - :type file_path: Path + Parameters + ---------- + science_filename_parser : function + The parser function to process file names. + file_path : Path + The path of the file in the filesystem. + s3_key : str + The S3 key of the file. + s3_bucket : str + The S3 bucket of the file. + science_product_id : int, optional + The ID of the science product, by default None. + status : dict, optional + The status dictionary for tracking, by default None. + + Returns + ------- + Tuple[int, int] + A tuple containing the science file ID and science product ID. """ secret_arn = os.getenv("RDS_SECRET_ARN", None) - if secret_arn: - try: - # Validate file path - if not file_path or not isinstance(file_path, Path): - raise ValueError("Invalid file path provided.") - # Check if file exists - if not file_path.exists(): - raise FileNotFoundError(f"File not found: {file_path}") - - # Get Database Credentials - session = boto3.session.Session() - client = session.client(service_name="secretsmanager") - response = client.get_secret_value(SecretId=secret_arn) - secret = json.loads(response["SecretString"]) - connection_string = ( - f"postgresql://{secret['username']}:{secret['password']}@" - f"{secret['host']}:{secret['port']}/{secret['dbname']}" - ) + if not secret_arn: + log.error( + f"Failed to update MetaTracker for file {file_path}. No RDS Secret ARN found in environment variables.", + ) + return None, None - metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config) + try: + # Validate file path + if not file_path or not isinstance(file_path, Path): + raise ValueError("Invalid file path provided.") + # Check if file exists + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + # Get Database Credentials + session = boto3.session.Session() + client = session.client(service_name="secretsmanager") + response = client.get_secret_value(SecretId=secret_arn) + secret = json.loads(response["SecretString"]) + connection_string = ( + f"postgresql://{secret['username']}:{secret['password']}@" + f"{secret['host']}:{secret['port']}/{secret['dbname']}" + ) - log.debug(swxsoc.config) + metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config) - log.debug(metatracker_config) + log.debug(swxsoc.config) - metatracker.set_config(metatracker_config) + log.debug(metatracker_config) - from metatracker.database import create_engine - from metatracker.database.tables import create_tables - from metatracker.tracker import tracker + metatracker.set_config(metatracker_config) - # Initialize the database engine - database_engine = create_engine(connection_string) + from metatracker.database import create_engine + from metatracker.database.tables import create_tables + from metatracker.tracker import tracker - # Create tables if they do not exist - create_tables(database_engine) + # Initialize the database engine + database_engine = create_engine(connection_string) - # Set tracker to MetaTracker - meta_tracker = tracker.MetaTracker( - database_engine, science_filename_parser - ) + # Create tables if they do not exist + create_tables(database_engine) - if meta_tracker: - science_file_id, science_product_id = meta_tracker.track( - file_path, s3_key, s3_bucket, status=status - ) + # Set tracker to MetaTracker + meta_tracker = tracker.MetaTracker(database_engine, science_filename_parser) - return science_file_id, science_product_id + if meta_tracker: + science_file_id, science_product_id = meta_tracker.track( + file_path, s3_key, s3_bucket, status=status + ) - return None + return science_file_id, science_product_id - except Exception as e: - log.error( - { - "status": "ERROR", - "message": str(e), - "traceback": traceback.format_exc(), - } - ) - return None + return None, None + + except Exception as e: + log.error( + { + "status": "ERROR", + "message": str(e), + "traceback": traceback.format_exc(), + } + ) + return None, None @staticmethod def get_metatracker_config(swxsoc_config: dict) -> dict: diff --git a/lambda_function/tests/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv b/lambda_function/tests/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv new file mode 100644 index 0000000..2af908a --- /dev/null +++ b/lambda_function/tests/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv @@ -0,0 +1,11 @@ +run_mode,control_mode,magnetic_control_timeout,estimator_main,estimator_backup,adcs_op_state,timestamp_ms +1,12,65535,6,1,0,1762106119422 +1,12,65535,6,1,0,1762106179414 +1,12,65535,6,1,0,1761936771334 +1,12,65535,6,1,0,1761936831407 +1,12,65535,6,1,0,1761936891419 +1,12,65535,6,1,0,1761936951483 +1,12,65535,6,1,0,1761937011552 +1,12,65535,6,1,0,1761937071617 +1,12,65535,6,1,0,1761937131524 +1,12,65535,6,1,0,1761937191508 \ No newline at end of file diff --git a/lambda_function/tests/test_data/test_craft_event.json b/lambda_function/tests/test_data/test_craft_event.json new file mode 100644 index 0000000..0e2755d --- /dev/null +++ b/lambda_function/tests/test_data/test_craft_event.json @@ -0,0 +1,15 @@ +{ + "Records": [ + { + "Sns": { + "Message": "{\r\n \"Records\": [\r\n {\r\n \"eventSource\": \"aws:s3\",\r\n \"awsRegion\": \"us-east-1\",\r\n \"eventTime\": \"2025-11-13T14:14:10.000Z\",\r\n \"eventName\": \"ObjectCreated:Put\",\r\n \"s3\": {\r\n \"bucket\": {\r\n \"name\": \"dev-padre-craft\"\r\n },\r\n \"object\": {\r\n \"key\": \"padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv\",\r\n \"size\": 1485,\r\n \"eTag\": \"32d82e8a2e72af004c557c4e369e89ff\",\r\n \"sequencer\": \"0062DE63CC330B223E\"\r\n }\r\n }\r\n }\r\n ]\r\n}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + } + } + } + } + ] +} \ No newline at end of file