From a2374aa86fab416932370c66d2078edaf60d4054 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Tue, 6 Jan 2026 15:37:31 -0800 Subject: [PATCH] feature: add emr-serverless step for SageMaker Pipelines + sample notebook --- .../mlops/workflow/emr_serverless_step.py | 162 +++++ .../src/sagemaker/mlops/workflow/steps.py | 23 +- .../unit/workflow/test_emr_serverless_step.py | 171 ++++++ .../v3-emr-serverless-step-example.ipynb | 551 ++++++++++++++++++ 4 files changed, 897 insertions(+), 10 deletions(-) create mode 100644 sagemaker-mlops/src/sagemaker/mlops/workflow/emr_serverless_step.py create mode 100644 sagemaker-mlops/tests/unit/workflow/test_emr_serverless_step.py create mode 100644 v3-examples/ml-ops-examples/v3-emr-serverless-step-example.ipynb diff --git a/sagemaker-mlops/src/sagemaker/mlops/workflow/emr_serverless_step.py b/sagemaker-mlops/src/sagemaker/mlops/workflow/emr_serverless_step.py new file mode 100644 index 0000000000..07beed79a4 --- /dev/null +++ b/sagemaker-mlops/src/sagemaker/mlops/workflow/emr_serverless_step.py @@ -0,0 +1,162 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""The step definitions for EMR Serverless workflow.""" +from __future__ import absolute_import + +from typing import Any, Dict, List, Union, Optional + +from sagemaker.core.helper.pipeline_variable import RequestType +from sagemaker.core.workflow.properties import Properties +from sagemaker.mlops.workflow.retry import StepRetryPolicy +from sagemaker.mlops.workflow.step_collections import StepCollection +from sagemaker.mlops.workflow.steps import ConfigurableRetryStep, Step, StepTypeEnum, CacheConfig + + +class EMRServerlessJobConfig: + """Config for EMR Serverless job.""" + + def __init__( + self, + job_driver: Dict, + execution_role_arn: str, + configuration_overrides: Optional[Dict] = None, + execution_timeout_minutes: Optional[int] = None, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + ): # pylint: disable=too-many-positional-arguments + """Create a definition for EMR Serverless job configuration. + + Args: + job_driver (Dict): The job driver for the job run. + execution_role_arn (str): The execution role ARN for the job run. + configuration_overrides (Dict, optional): Configuration overrides for the job run. + execution_timeout_minutes (int, optional): The maximum duration for the job run. + name (str, optional): The optional job run name. + tags (Dict[str, str], optional): The tags assigned to the job run. + """ + self.job_driver = job_driver + self.execution_role_arn = execution_role_arn + self.configuration_overrides = configuration_overrides + self.execution_timeout_minutes = execution_timeout_minutes + self.name = name + self.tags = tags + + def to_request(self, application_id: Optional[str] = None) -> RequestType: + """Convert EMRServerlessJobConfig object to request dict.""" + config = {"executionRoleArn": self.execution_role_arn, "jobDriver": self.job_driver} + if application_id is not None: + config["applicationId"] = application_id + if self.configuration_overrides is not None: + config["configurationOverrides"] = self.configuration_overrides + if self.execution_timeout_minutes is not None: + config["executionTimeoutMinutes"] = self.execution_timeout_minutes + if self.name is not None: + config["name"] = self.name + if self.tags is not None: + config["tags"] = self.tags + return config + + +ERR_STR_WITH_BOTH_APP_ID_AND_APP_CONFIG = ( + "EMRServerlessStep {step_name} cannot have both application_id and application_config. " + "To use EMRServerlessStep with application_config, " + "application_id must be explicitly set to None." +) + +ERR_STR_WITHOUT_APP_ID_AND_APP_CONFIG = ( + "EMRServerlessStep {step_name} must have either application_id or application_config" +) + + +class EMRServerlessStep(ConfigurableRetryStep): + """EMR Serverless step for workflow with configurable retry policies.""" + + def __init__( + self, + name: str, + display_name: str, + description: str, + job_config: EMRServerlessJobConfig, + application_id: Optional[str] = None, + application_config: Optional[Dict[str, Any]] = None, + depends_on: Optional[List[Union[str, Step, StepCollection]]] = None, + cache_config: Optional[CacheConfig] = None, + retry_policies: Optional[List[StepRetryPolicy]] = None, + ): # pylint: disable=too-many-positional-arguments + """Constructs an `EMRServerlessStep`. + + Args: + name (str): The name of the EMR Serverless step. + display_name (str): The display name of the EMR Serverless step. + description (str): The description of the EMR Serverless step. + job_config (EMRServerlessJobConfig): Job configuration for the EMR Serverless job. + application_id (str, optional): The ID of the existing EMR Serverless application. + application_config (Dict[str, Any], optional): Configuration for creating a new + EMR Serverless application. + depends_on (List[Union[str, Step, StepCollection]], optional): A list of + `Step`/`StepCollection` names or `Step` instances or `StepCollection` instances + that this `EMRServerlessStep` depends on. + cache_config (CacheConfig, optional): A `sagemaker.workflow.steps.CacheConfig` instance. + retry_policies (List[StepRetryPolicy], optional): A list of retry policies. + """ + super().__init__( + name=name, + step_type=StepTypeEnum.EMR_SERVERLESS, + display_name=display_name, + description=description, + depends_on=depends_on, + retry_policies=retry_policies, + ) + + if application_id is None and application_config is None: + raise ValueError(ERR_STR_WITHOUT_APP_ID_AND_APP_CONFIG.format(step_name=name)) + + if application_id is not None and application_config is not None: + raise ValueError(ERR_STR_WITH_BOTH_APP_ID_AND_APP_CONFIG.format(step_name=name)) + + emr_serverless_args = { + "ExecutionRoleArn": job_config.execution_role_arn, # Top-level role (used by backend) + "JobConfig": job_config.to_request( + application_id + ), # Role also in JobConfig (structure requirement) + } + + if application_id is not None: + emr_serverless_args["ApplicationId"] = application_id + elif application_config is not None: + emr_serverless_args["ApplicationConfig"] = application_config + + self.args = emr_serverless_args + self.cache_config = cache_config + + root_property = Properties( + step_name=name, step=self, shape_name="GetJobRunResponse", service_name="emr-serverless" + ) + self._properties = root_property + + @property + def arguments(self) -> RequestType: + """The arguments dict that is used to call EMR Serverless APIs.""" + return self.args + + @property + def properties(self) -> RequestType: + """A Properties object representing the EMR Serverless GetJobRunResponse model.""" + return self._properties + + def to_request(self) -> RequestType: + """Updates the dictionary with cache configuration and retry policies.""" + request_dict = super().to_request() + if self.cache_config: + request_dict.update(self.cache_config.config) + return request_dict diff --git a/sagemaker-mlops/src/sagemaker/mlops/workflow/steps.py b/sagemaker-mlops/src/sagemaker/mlops/workflow/steps.py index 295fb633ff..5e0eb3dda3 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/workflow/steps.py +++ b/sagemaker-mlops/src/sagemaker/mlops/workflow/steps.py @@ -20,6 +20,7 @@ import attr from sagemaker.core.local.local_session import LocalSagemakerClient + # Primitive imports (stay in core) from sagemaker.core.workflow.entities import Entity from sagemaker.core.helper.pipeline_variable import RequestType @@ -30,6 +31,7 @@ ) from sagemaker.core.helper.pipeline_variable import PipelineVariable from sagemaker.core.workflow.functions import Join, JsonGet + # Orchestration imports (now in mlops) from sagemaker.mlops.workflow.retry import RetryPolicy from sagemaker.core.workflow.step_outputs import StepOutput @@ -57,6 +59,7 @@ class StepTypeEnum(Enum): QUALITY_CHECK = "QualityCheck" CLARIFY_CHECK = "ClarifyCheck" EMR = "EMR" + EMR_SERVERLESS = "EMRServerless" FAIL = "Fail" AUTOML = "AutoML" @@ -417,6 +420,7 @@ def __init__( if step_args: from sagemaker.core.workflow.utilities import validate_step_args_input + # Lazy import to avoid circular dependency from sagemaker.train.model_trainer import ModelTrainer @@ -436,7 +440,7 @@ def __init__( def arguments(self) -> RequestType: """The arguments dictionary that is used to call `create_training_job`. - NOTE: The `CreateTrainingJob` request is not quite the args list that workflow needs. + NOTE: The `CreateTrainingJob` request is not quite the args list that workflow needs. """ from sagemaker.core.workflow.utilities import execute_job_functions from sagemaker.core.workflow.utilities import _pipeline_config @@ -451,7 +455,7 @@ def arguments(self) -> RequestType: request_dict = model_trainer.sagemaker_session.context.args else: raise ValueError("step_args input is required.") - + if "HyperParameters" in request_dict: request_dict["HyperParameters"].pop("sagemaker_job_name", None) @@ -606,11 +610,13 @@ def __init__( raise ValueError("step_args is required for ProcessingStep.") from sagemaker.core.workflow.utilities import validate_step_args_input - validate_step_args_input( step_args=step_args, - expected_caller={Processor.run.__name__, LocalSagemakerClient().create_processing_job.__name__}, + expected_caller={ + Processor.run.__name__, + LocalSagemakerClient().create_processing_job.__name__, + }, error_message=f"The step_args of ProcessingStep must be obtained from processor.run() or in local mode, not {step_args.caller_name}", ) @@ -638,7 +644,7 @@ def arguments(self) -> RequestType: # populate request dict with args processor = self.step_args.func_args[0] request_dict = processor.sagemaker_session.context.args - + # Continue to pop job name if not explicitly opted-in via config request_dict = trim_request_dict(request_dict, "ProcessingJobName", _pipeline_config) @@ -663,8 +669,6 @@ def to_request(self) -> RequestType: return request_dict - - class TuningStep(ConfigurableRetryStep): """`TuningStep` for SageMaker Pipelines Workflows.""" @@ -698,7 +702,7 @@ def __init__( name, StepTypeEnum.TUNING, display_name, description, depends_on, retry_policies ) - if not step_args : + if not step_args: raise ValueError("step_args is required for TuningStep.") from sagemaker.core.workflow.utilities import validate_step_args_input @@ -737,7 +741,6 @@ def arguments(self) -> RequestType: # populate request dict with args tuner = self.step_args.func_args[0] request_dict = tuner.sagemaker_session.context.args - # Continue to pop job name if not explicitly opted-in via config request_dict = trim_request_dict( @@ -785,4 +788,4 @@ def get_top_model_s3_uri(self, top_k: int, s3_bucket: str, prefix: str = "") -> self.properties.TrainingJobSummaries[top_k].TrainingJobName, "output/model.tar.gz", ], - ) \ No newline at end of file + ) diff --git a/sagemaker-mlops/tests/unit/workflow/test_emr_serverless_step.py b/sagemaker-mlops/tests/unit/workflow/test_emr_serverless_step.py new file mode 100644 index 0000000000..fbba7a2660 --- /dev/null +++ b/sagemaker-mlops/tests/unit/workflow/test_emr_serverless_step.py @@ -0,0 +1,171 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Unit tests for EMR Serverless step.""" + +from __future__ import absolute_import + +import pytest +from sagemaker.mlops.workflow.emr_serverless_step import EMRServerlessStep +from sagemaker.mlops.workflow.emr_serverless_step import EMRServerlessJobConfig + + +class TestEMRServerlessJobConfig: + """Test EMRServerlessJobConfig class.""" + + def test_job_config_structure(self): + """Test EMRServerlessJobConfig creates correct request structure.""" + job_config = EMRServerlessJobConfig( + job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessRole", + configuration_overrides={ + "applicationConfiguration": [ + { + "classification": "spark-defaults", + "properties": {"spark.sql.adaptive.enabled": "true"}, + } + ] + }, + ) + + expected = { + "executionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "jobDriver": {"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + "configurationOverrides": { + "applicationConfiguration": [ + { + "classification": "spark-defaults", + "properties": {"spark.sql.adaptive.enabled": "true"}, + } + ] + }, + } + + assert job_config.to_request() == expected + + +class TestEMRServerlessStep: + """Test EMRServerlessStep class.""" + + def test_existing_application_step(self): + """Test EMRServerlessStep with existing application ID.""" + job_config = EMRServerlessJobConfig( + job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessRole", + ) + + step = EMRServerlessStep( + name="test-step", + display_name="Test Step", + description="Test Description", + job_config=job_config, + application_id="app-123", + ) + + expected_args = { + "ExecutionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "ApplicationId": "app-123", + "JobConfig": { + "applicationId": "app-123", + "executionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "jobDriver": {"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + }, + } + + assert step.arguments == expected_args + + def test_new_application_step(self): + """Test EMRServerlessStep with new application config.""" + job_config = EMRServerlessJobConfig( + job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessRole", + ) + + step = EMRServerlessStep( + name="test-step", + display_name="Test Step", + description="Test Description", + job_config=job_config, + application_config={ + "name": "test-application", + "releaseLabel": "emr-6.15.0", + "type": "SPARK", + }, + ) + + expected_args = { + "ExecutionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "ApplicationConfig": { + "name": "test-application", + "releaseLabel": "emr-6.15.0", + "type": "SPARK", + }, + "JobConfig": { + "executionRoleArn": "arn:aws:iam::123456789012:role/EMRServerlessRole", + "jobDriver": {"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + }, + } + + assert step.arguments == expected_args + + def test_validation_errors(self): + """Test EMRServerlessStep raises errors for invalid inputs.""" + job_config = EMRServerlessJobConfig( + job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessRole", + ) + + # Should raise error when neither provided + with pytest.raises( + ValueError, match="must have either application_id or application_config" + ): + EMRServerlessStep( + name="test-step", + display_name="Test Step", + description="Test Description", + job_config=job_config, + ) + + # Should raise error when both provided + with pytest.raises( + ValueError, match="cannot have both application_id and application_config" + ): + EMRServerlessStep( + name="test-step", + display_name="Test Step", + description="Test Description", + job_config=job_config, + application_id="app-123", + application_config={"name": "test-app"}, + ) + + def test_to_request(self): + """Test EMRServerlessStep to_request method.""" + job_config = EMRServerlessJobConfig( + job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/script.py"}}, + execution_role_arn="arn:aws:iam::123456789012:role/EMRServerlessRole", + ) + + step = EMRServerlessStep( + name="test-step", + display_name="Test Step", + description="Test Description", + job_config=job_config, + application_id="app-123", + ) + + request = step.to_request() + assert request["Name"] == "test-step" + assert request["Type"] == "EMRServerless" + assert "Arguments" in request + assert request["DisplayName"] == "Test Step" + assert request["Description"] == "Test Description" diff --git a/v3-examples/ml-ops-examples/v3-emr-serverless-step-example.ipynb b/v3-examples/ml-ops-examples/v3-emr-serverless-step-example.ipynb new file mode 100644 index 0000000000..913b7d5529 --- /dev/null +++ b/v3-examples/ml-ops-examples/v3-emr-serverless-step-example.ipynb @@ -0,0 +1,551 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "4a989e09-4dc7-49bd-81e0-758fb266bab9", + "metadata": {}, + "source": [ + "# EMR Serverless Step in SageMaker Pipelines\n", + "\n", + "This notebook demonstrates how to use the `EMRServerlessStep` to run Spark jobs on EMR Serverless within a SageMaker Pipeline.\n", + "\n", + "## Prerequisites\n", + "- An AWS account with EMR Serverless access\n", + "\n", + "## What This Notebook Does\n", + "1. Creates an IAM role for EMR Serverless (if it doesn't exist)\n", + "2. Creates a sample PySpark script\n", + "3. Copies sample data to your S3 bucket\n", + "4. Creates an EMR Serverless step that provisions a new application\n", + "5. Creates and executes a SageMaker Pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c08d0de4-45c7-4341-8d69-8e5f213ea0b5", + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.mlops.workflow.emr_serverless_step import (\n", + " EMRServerlessStep,\n", + " EMRServerlessJobConfig,\n", + ")\n", + "from sagemaker.mlops.workflow.pipeline import Pipeline\n", + "from sagemaker.core.workflow.parameters import ParameterString\n", + "from sagemaker.core.workflow.pipeline_context import PipelineSession\n", + "from sagemaker.core.helper.session_helper import Session, get_execution_role" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "58fb807e-4ad6-41d6-8557-1eda65370b8b", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the SageMaker Session\n", + "sagemaker_session = Session()\n", + "pipeline_session = PipelineSession()\n", + "region = sagemaker_session.boto_region_name\n", + "account_id = sagemaker_session.account_id()\n", + "\n", + "print(f\"Region: {region}\")\n", + "print(f\"Account ID: {account_id}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bad5f5fc-87aa-46ca-a338-a1dd82b08e61", + "metadata": {}, + "outputs": [], + "source": [ + "# Define variables and parameters needed for the Pipeline steps\n", + "role = get_execution_role()\n", + "default_bucket = sagemaker_session.default_bucket()\n", + "s3_prefix = \"v3-emr-serverless-pipeline\"\n", + "\n", + "# Pipeline parameters\n", + "emr_execution_role = ParameterString(\n", + " name=\"EMRServerlessExecutionRole\",\n", + " default_value=f\"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole\"\n", + ")\n", + "\n", + "spark_script_uri = ParameterString(\n", + " name=\"SparkScriptUri\",\n", + " default_value=f\"s3://{default_bucket}/{s3_prefix}/scripts/spark_job.py\"\n", + ")\n", + "\n", + "print(f\"Role: {role}\")\n", + "print(f\"Default Bucket: {default_bucket}\")" + ] + }, + { + "cell_type": "markdown", + "id": "4d96870c-3de6-4857-8041-dd82f5c7a27a", + "metadata": {}, + "source": [ + "## Create IAM Role for EMR Serverless\n", + "\n", + "The EMR Serverless job needs an execution role with permissions to access S3 and CloudWatch Logs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "944de88b-3cd0-49d3-84c3-5465e4ae2763", + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import json\n", + "\n", + "# Create IAM role for EMR Serverless (if it doesn't exist)\n", + "iam_client = boto3.client('iam')\n", + "\n", + "trust_policy = {\n", + " \"Version\": \"2012-10-17\",\n", + " \"Statement\": [\n", + " {\n", + " \"Effect\": \"Allow\",\n", + " \"Principal\": {\n", + " \"Service\": \"emr-serverless.amazonaws.com\"\n", + " },\n", + " \"Action\": \"sts:AssumeRole\"\n", + " }\n", + " ]\n", + "}\n", + "\n", + "try:\n", + " iam_client.create_role(\n", + " RoleName=\"EMRServerlessExecutionRole\",\n", + " AssumeRolePolicyDocument=json.dumps(trust_policy),\n", + " Description=\"Execution role for EMR Serverless\"\n", + " )\n", + " print(\"Role created!\")\n", + "except iam_client.exceptions.EntityAlreadyExistsException:\n", + " print(\"Role already exists\")\n", + "\n", + "# Attach required policies\n", + "for policy_arn in [\n", + " \"arn:aws:iam::aws:policy/AmazonS3FullAccess\",\n", + " \"arn:aws:iam::aws:policy/CloudWatchLogsFullAccess\"\n", + "]:\n", + " iam_client.attach_role_policy(\n", + " RoleName=\"EMRServerlessExecutionRole\",\n", + " PolicyArn=policy_arn\n", + " )\n", + "\n", + "print(\"EMRServerlessExecutionRole is ready!\")\n", + "print(f\"Role ARN: arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2aa34ad2-705a-48c1-9335-5c0b62345f9c", + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p code" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "06a2c2c7-85e3-4087-b536-18d0d9ef7f2b", + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile code/spark_job.py\n", + "\n", + "\"\"\"Sample PySpark job for EMR Serverless.\"\"\"\n", + "import argparse\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.types import StructType, StructField, StringType, DoubleType\n", + "\n", + "\n", + "def main():\n", + " parser = argparse.ArgumentParser()\n", + " parser.add_argument(\"--input\", type=str, required=True, help=\"Input S3 path\")\n", + " parser.add_argument(\"--output\", type=str, required=True, help=\"Output S3 path\")\n", + " args = parser.parse_args()\n", + "\n", + " # Create Spark session\n", + " spark = SparkSession.builder.appName(\"EMRServerlessExample\").getOrCreate()\n", + "\n", + " print(f\"Reading data from: {args.input}\")\n", + " \n", + " # Define schema for abalone dataset (no headers in CSV)\n", + " schema = StructType([\n", + " StructField(\"sex\", StringType(), True),\n", + " StructField(\"length\", DoubleType(), True),\n", + " StructField(\"diameter\", DoubleType(), True),\n", + " StructField(\"height\", DoubleType(), True),\n", + " StructField(\"whole_weight\", DoubleType(), True),\n", + " StructField(\"shucked_weight\", DoubleType(), True),\n", + " StructField(\"viscera_weight\", DoubleType(), True),\n", + " StructField(\"shell_weight\", DoubleType(), True),\n", + " StructField(\"rings\", DoubleType(), True),\n", + " ])\n", + " \n", + " # Read input data (no header in abalone dataset)\n", + " df = spark.read.csv(args.input, header=False, schema=schema)\n", + " \n", + " # Simple transformation - show schema and count\n", + " print(\"Schema:\")\n", + " df.printSchema()\n", + " print(f\"Row count: {df.count()}\")\n", + " \n", + " # Show sample data\n", + " print(\"Sample data:\")\n", + " df.show(5)\n", + " \n", + " # Example transformation - compute statistics\n", + " result_df = df.describe()\n", + " \n", + " # Write output\n", + " print(f\"Writing results to: {args.output}\")\n", + " result_df.write.mode(\"overwrite\").parquet(args.output)\n", + " \n", + " print(\"Job completed successfully!\")\n", + " spark.stop()\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9b00c944-b9b3-4de7-baa5-1968d56d239e", + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "\n", + "# Upload the Spark script to S3\n", + "s3_client = boto3.client(\"s3\")\n", + "script_s3_key = f\"{s3_prefix}/scripts/spark_job.py\"\n", + "\n", + "s3_client.upload_file(\n", + " \"code/spark_job.py\",\n", + " default_bucket,\n", + " script_s3_key\n", + ")\n", + "\n", + "script_s3_uri = f\"s3://{default_bucket}/{script_s3_key}\"\n", + "print(f\"Spark script uploaded to: {script_s3_uri}\")" + ] + }, + { + "cell_type": "markdown", + "id": "1bfec273-51df-46e8-9415-210ad00e75cd", + "metadata": {}, + "source": [ + "## Copy Sample Data to Your Bucket\n", + "\n", + "We copy the sample data from AWS public bucket to your bucket to ensure it's in the same region as EMR Serverless." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8cf48f54-f68d-4b66-99a1-4e3d9d8bfcf4", + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "\n", + "# Copy sample data to your bucket (same region as EMR Serverless)\n", + "s3_resource = boto3.resource('s3')\n", + "copy_source = {\n", + " 'Bucket': 'sagemaker-sample-files',\n", + " 'Key': 'datasets/tabular/uci_abalone/abalone.csv'\n", + "}\n", + "\n", + "dest_key = f\"{s3_prefix}/input/abalone.csv\"\n", + "s3_resource.meta.client.copy(copy_source, default_bucket, dest_key)\n", + "\n", + "input_data_uri = f\"s3://{default_bucket}/{dest_key}\"\n", + "print(f\"Sample data copied to: {input_data_uri}\")" + ] + }, + { + "cell_type": "markdown", + "id": "959d6945-f7c0-437e-9b81-3fe5076e310f", + "metadata": {}, + "source": [ + "## Create EMR Serverless Step\n", + "\n", + "The `EMRServerlessStep` supports two modes:\n", + "1. **Existing Application**: Use an existing EMR Serverless application ID\n", + "2. **New Application**: Create a new EMR Serverless application as part of the step\n", + "\n", + "This notebook uses Option 2 (New Application) so it works out of the box." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96377844-595f-4c9a-844e-bc4ff344b108", + "metadata": {}, + "outputs": [], + "source": [ + "# Define the EMR Serverless job configuration\n", + "job_config = EMRServerlessJobConfig(\n", + " job_driver={\n", + " \"sparkSubmit\": {\n", + " \"entryPoint\": script_s3_uri,\n", + " \"entryPointArguments\": [\n", + " \"--input\", input_data_uri,\n", + " \"--output\", f\"s3://{default_bucket}/{s3_prefix}/output/\"\n", + " ],\n", + " }\n", + " },\n", + " execution_role_arn=f\"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole\",\n", + " configuration_overrides={\n", + " \"monitoringConfiguration\": {\n", + " \"s3MonitoringConfiguration\": {\n", + " \"logUri\": f\"s3://{default_bucket}/{s3_prefix}/logs/\"\n", + " }\n", + " }\n", + " }\n", + ")\n", + "\n", + "print(\"EMR Serverless Job Configuration created\")" + ] + }, + { + "cell_type": "markdown", + "id": "4202027e-a1c5-4a03-b971-e5c5603472f5", + "metadata": {}, + "source": [ + "### Option 1: Use Existing EMR Serverless Application (Optional)\n", + "\n", + "If you have an existing EMR Serverless application, you can use it instead. Uncomment the code below and replace `your-application-id` with your actual application ID." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b0886fa-2c5b-4bf4-b617-dcbbc987f1c2", + "metadata": {}, + "outputs": [], + "source": [ + "# Option 1: Use an existing EMR Serverless application\n", + "# Uncomment below if you have an existing application\n", + "\n", + "# step_emr_serverless_existing = EMRServerlessStep(\n", + "# name=\"EMRServerlessSparkJob\",\n", + "# display_name=\"EMR Serverless Spark Job\",\n", + "# description=\"Run a PySpark job on EMR Serverless\",\n", + "# job_config=job_config,\n", + "# application_id=\"your-application-id\", # Replace with your application ID\n", + "# )\n", + "\n", + "print(\"Option 1 skipped - Using Option 2 (new application) below\")" + ] + }, + { + "cell_type": "markdown", + "id": "b844e517-eecc-4b74-8a60-18b74b070f4a", + "metadata": {}, + "source": [ + "### Option 2: Create New EMR Serverless Application (Default)\n", + "\n", + "This option creates a new EMR Serverless application as part of the pipeline step. The application will auto-start when needed and auto-stop after 15 minutes of idle time." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6bb8cebe-9495-48d4-b373-efaaf684d8c2", + "metadata": {}, + "outputs": [], + "source": [ + "# Option 2: Create a new EMR Serverless application as part of the step\n", + "# This is the default option that works out of the box\n", + "\n", + "step_emr_serverless = EMRServerlessStep(\n", + " name=\"EMRServerlessSparkJob\",\n", + " display_name=\"EMR Serverless Spark Job\",\n", + " description=\"Run a PySpark job with a newly created EMR Serverless application\",\n", + " job_config=job_config,\n", + " application_config={\n", + " \"name\": \"sagemaker-pipeline-spark-app\",\n", + " \"releaseLabel\": \"emr-6.15.0\",\n", + " \"type\": \"SPARK\",\n", + " \"autoStartConfiguration\": {\n", + " \"enabled\": True\n", + " },\n", + " \"autoStopConfiguration\": {\n", + " \"enabled\": True,\n", + " \"idleTimeoutMinutes\": 15\n", + " },\n", + " },\n", + ")\n", + "\n", + "print(f\"Step Name: {step_emr_serverless.name}\")\n", + "print(f\"Step Type: {step_emr_serverless.step_type}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "08b91bbc-70b1-4d93-8ea6-8e56ed457bab", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the pipeline\n", + "\n", + "pipeline = Pipeline(\n", + " name=\"EMRServerlessPipeline\",\n", + " parameters=[\n", + " emr_execution_role,\n", + " spark_script_uri,\n", + " ],\n", + " steps=[step_emr_serverless],\n", + " sagemaker_session=pipeline_session,\n", + ")\n", + "\n", + "print(\"Pipeline created successfully!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e173f637-93f3-458a-8cb4-e6108a3093d3", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "definition = json.loads(pipeline.definition())\n", + "print(json.dumps(definition, indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "6ee3b623-9cbc-4ed3-8bdb-9259f9465bb7", + "metadata": {}, + "source": [ + "## Execute Pipeline\n", + "\n", + "The cells below will:\n", + "1. Create/update the pipeline in SageMaker\n", + "2. Start the pipeline execution\n", + "3. Wait for completion" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a48b416b-4e45-4ba6-baef-a1d04d6639a0", + "metadata": {}, + "outputs": [], + "source": [ + "# Create/update the pipeline\n", + "pipeline.upsert(role_arn=role)\n", + "print(\"Pipeline upserted successfully!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01509f8a-1a35-4b88-82f1-c66bcca31be0", + "metadata": {}, + "outputs": [], + "source": [ + "# Start the pipeline execution\n", + "execution = pipeline.start()\n", + "print(f\"Pipeline execution started: {execution.arn}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4aa8914c-2421-4c3d-9b3b-4f6dce8f9d1f", + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "# Wait for pipeline execution to complete\n", + "while True:\n", + " status = execution.describe()['PipelineExecutionStatus']\n", + " print(f\"Status: {status}\")\n", + " \n", + " if status in ['Succeeded', 'Failed', 'Stopped']:\n", + " print(f\"Pipeline finished with status: {status}\")\n", + " break\n", + " \n", + " print(\"Still running... waiting 30 seconds\")\n", + " time.sleep(30)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "daad6e09-04a2-4052-8009-d952c3bec3fa", + "metadata": {}, + "outputs": [], + "source": [ + "# Check step execution details\n", + "steps = execution.list_steps()\n", + "for step in steps:\n", + " print(f\"Step: {step['StepName']}\")\n", + " print(f\" Status: {step['StepStatus']}\")\n", + " if 'FailureReason' in step:\n", + " print(f\" Failure Reason: {step['FailureReason']}\")\n", + " print()" + ] + }, + { + "cell_type": "markdown", + "id": "68c159c3-a48f-44e3-ad79-56b1f04b6c8c", + "metadata": {}, + "source": [ + "## Cleanup (Optional)\n", + "\n", + "Uncomment the cell below to delete the pipeline when you're done." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "de7b5b50-a1c1-4c20-90da-a10d1b72b2ce", + "metadata": {}, + "outputs": [], + "source": [ + "# Uncomment to delete the pipeline\n", + "sm_client = sagemaker_session.sagemaker_client\n", + "sm_client.delete_pipeline(PipelineName=\"EMRServerlessPipeline\")\n", + "print(\"Pipeline deleted\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}