How to Use SageMaker Pipelines for ML Workflow Automation
Use
SageMaker Pipelines to automate ML workflows by defining a sequence of steps like data processing, training, and evaluation using the Pipeline class. You create steps with built-in or custom components, then build and run the pipeline to manage your ML tasks efficiently.Syntax
A SageMaker pipeline is created by defining steps and then combining them into a Pipeline object. Each step represents a task like data processing or model training.
- Pipeline: The main object that holds all steps.
- Steps: Individual tasks such as
ProcessingStep,TrainingStep, andModelStep. - Parameters: Variables to make pipelines flexible.
- Execution: Running the pipeline triggers all steps in order.
python
from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import ProcessingStep, TrainingStep from sagemaker.processing import ScriptProcessor from sagemaker.estimator import Estimator # Define processing step script_processor = ScriptProcessor(command=['python3'], image_uri='123456789012.dkr.ecr.us-west-2.amazonaws.com/my-image:latest', role='SageMakerRole', instance_count=1, instance_type='ml.m5.xlarge') processing_step = ProcessingStep( name='MyProcessingStep', processor=script_processor, inputs=[], outputs=[], code='preprocessing.py' ) # Define training step estimator = Estimator(image_uri='123456789012.dkr.ecr.us-west-2.amazonaws.com/my-training-image:latest', role='SageMakerRole', instance_count=1, instance_type='ml.m5.xlarge') training_step = TrainingStep( name='MyTrainingStep', estimator=estimator, inputs={'train': 's3://my-bucket/train-data/'} ) # Create pipeline pipeline = Pipeline( name='MyPipeline', steps=[processing_step, training_step] )
Example
This example shows how to create a simple SageMaker pipeline with a processing step and a training step, then start the pipeline execution.
python
from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import ProcessingStep, TrainingStep from sagemaker.processing import ScriptProcessor from sagemaker.estimator import Estimator from sagemaker.session import Session # Initialize SageMaker session sagemaker_session = Session() # Define a script processor for data preprocessing script_processor = ScriptProcessor( command=['python3'], image_uri='683313688378.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3', role='SageMakerRole', instance_count=1, instance_type='ml.m5.xlarge', sagemaker_session=sagemaker_session ) # Create a processing step processing_step = ProcessingStep( name='PreprocessData', processor=script_processor, inputs=[], outputs=[], code='preprocessing.py' ) # Define an estimator for training estimator = Estimator( image_uri='382416733822.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest', role='SageMakerRole', instance_count=1, instance_type='ml.m5.xlarge', sagemaker_session=sagemaker_session ) # Create a training step training_step = TrainingStep( name='TrainModel', estimator=estimator, inputs={'train': 's3://my-bucket/train-data/'} ) # Build the pipeline pipeline = Pipeline( name='MySimplePipeline', steps=[processing_step, training_step], sagemaker_session=sagemaker_session ) # Submit the pipeline for execution execution = pipeline.start() # Print execution ARN print(f"Pipeline execution started with ARN: {execution.arn}")
Output
Pipeline execution started with ARN: arn:aws:sagemaker:us-west-2:123456789012:pipeline/MySimplePipeline/execution/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Common Pitfalls
Common mistakes when using SageMaker pipelines include:
- Not specifying the correct
rolewith permissions for SageMaker operations. - Forgetting to provide required inputs or outputs for processing and training steps.
- Using incompatible instance types or missing the
sagemaker_sessionparameter. - Not handling pipeline parameters properly, which reduces pipeline flexibility.
- Trying to run pipelines without uploading scripts or data to accessible S3 locations.
Always check AWS IAM permissions and ensure all resources are accessible.
python
from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import ProcessingStep from sagemaker.processing import ScriptProcessor # Wrong: Missing role parameter script_processor = ScriptProcessor( command=['python3'], image_uri='my-image', instance_count=1, instance_type='ml.m5.xlarge' ) # Right: Include role with permissions script_processor = ScriptProcessor( command=['python3'], image_uri='my-image', role='SageMakerRole', instance_count=1, instance_type='ml.m5.xlarge' )
Quick Reference
SageMaker Pipelines Cheat Sheet:
| Component | Description |
|---|---|
Pipeline | Defines the workflow with ordered steps. |
ProcessingStep | Runs data processing scripts. |
TrainingStep | Trains ML models using estimators. |
ModelStep | Creates model objects for deployment. |
Parameters | Make pipelines flexible and reusable. |
start() | Runs the pipeline asynchronously. |
| Component | Description |
|---|---|
| Pipeline | Defines the workflow with ordered steps. |
| ProcessingStep | Runs data processing scripts. |
| TrainingStep | Trains ML models using estimators. |
| ModelStep | Creates model objects for deployment. |
| Parameters | Make pipelines flexible and reusable. |
| start() | Runs the pipeline asynchronously. |
Key Takeaways
Define your ML workflow as a sequence of steps using SageMaker Pipeline and step classes.
Always specify the correct IAM role and ensure all scripts and data are accessible in S3.
Use pipeline parameters to make your workflows flexible and reusable.
Run pipelines with the start() method to execute all steps automatically.
Check for common errors like missing roles or inputs to avoid pipeline failures.