0
0
Ml-pythonHow-ToBeginner ยท 4 min read

How to Use Vertex AI Pipelines for ML Workflow Automation

Use Vertex AI pipelines by defining your ML workflow as a pipeline using the kfp SDK, then compile and submit it to Vertex AI for execution. This automates tasks like data preprocessing, training, and deployment in a scalable way on Google Cloud.
๐Ÿ“

Syntax

Vertex AI pipelines use the kfp (Kubeflow Pipelines) SDK to define workflows as Python functions decorated with @dsl.pipeline. Each step is a component that performs a task. You compile the pipeline to a JSON file and submit it to Vertex AI using the Google Cloud SDK or client libraries.

Key parts:

  • @dsl.pipeline: marks the pipeline function.
  • Pipeline parameters: inputs to customize runs.
  • Components: individual tasks like training or preprocessing.
  • compiler.Compiler().compile(): converts pipeline code to JSON.
  • Vertex AI client: submits and manages pipeline runs.
python
from kfp import dsl
from kfp.v2 import compiler

@dsl.pipeline(name='sample-pipeline')
def pipeline(param1: str = 'default'):
    # Define pipeline steps here
    pass

compiler.Compiler().compile(pipeline_func=pipeline, package_path='pipeline.json')
๐Ÿ’ป

Example

This example shows a simple Vertex AI pipeline that prints a message. It defines a pipeline with one step, compiles it, and submits it to Vertex AI for execution.

python
from kfp import dsl
from kfp.v2 import compiler
from google.cloud import aiplatform

@dsl.component
def print_message_op(message: str):
    print(message)

@dsl.pipeline(name='hello-world-pipeline')
def hello_world_pipeline(message: str = 'Hello, Vertex AI!'):
    print_message_op(message=message)

# Compile the pipeline to a JSON file
compiler.Compiler().compile(
    pipeline_func=hello_world_pipeline,
    package_path='hello_world_pipeline.json'
)

# Initialize Vertex AI client
project_id = 'your-project-id'
region = 'us-central1'
aiplatform.init(project=project_id, location=region)

# Submit the pipeline job
job = aiplatform.PipelineJob(
    display_name='hello-world-pipeline-job',
    template_path='hello_world_pipeline.json',
    pipeline_root='gs://your-bucket/pipeline-root/',
    parameter_values={'message': 'Hello, Vertex AI Pipelines!'}
)
job.run()
Output
INFO: Created PipelineJob: projects/your-project-id/locations/us-central1/pipelineJobs/hello-world-pipeline-job INFO: PipelineJob run started...
โš ๏ธ

Common Pitfalls

Common mistakes when using Vertex AI pipelines include:

  • Not setting pipeline_root to a valid Google Cloud Storage path, causing pipeline failures.
  • Forgetting to initialize the Vertex AI client with aiplatform.init() before submitting jobs.
  • Using components without the @dsl.component decorator, which prevents them from running as pipeline steps.
  • Not compiling the pipeline before submission, leading to errors.
  • Incorrectly specifying pipeline parameters or missing required parameters.
python
from kfp import dsl

# Wrong: Missing @dsl.component decorator

def step_without_decorator():
    print('This will fail')

@dsl.pipeline(name='bad-pipeline')
def bad_pipeline():
    step_without_decorator()

# Right: Add @dsl.component decorator

@dsl.component
def step_with_decorator():
    print('This works')

@dsl.pipeline(name='good-pipeline')
def good_pipeline():
    step_with_decorator()
๐Ÿ“Š

Quick Reference

Tips for using Vertex AI pipelines effectively:

  • Always decorate pipeline steps with @dsl.component.
  • Compile your pipeline with compiler.Compiler().compile() before submission.
  • Set pipeline_root to a Google Cloud Storage bucket for storing pipeline artifacts.
  • Initialize Vertex AI SDK with aiplatform.init() using your project and region.
  • Use PipelineJob.run() to start pipeline execution and monitor progress in Google Cloud Console.
โœ…

Key Takeaways

Define your ML workflow as a pipeline function using the kfp SDK and @dsl.pipeline decorator.
Compile the pipeline to a JSON file before submitting it to Vertex AI.
Initialize the Vertex AI client with project and region before running pipelines.
Use @dsl.component to mark each pipeline step for proper execution.
Set a valid Google Cloud Storage path as pipeline_root to store pipeline data.