0
0
MLOpsdevops~5 mins

Pipeline components and DAGs in MLOps - Commands & Configuration

Choose your learning style9 modes available
Introduction
When you build machine learning workflows, you need to organize tasks so they run in order. Pipeline components are the building blocks of these workflows, and DAGs (Directed Acyclic Graphs) show how these tasks connect and flow without loops.
When you want to automate data preprocessing, model training, and evaluation steps in one flow
When you need to run tasks in a specific order and handle dependencies between them
When you want to reuse parts of your workflow as separate components for different projects
When you want to visualize the flow of your machine learning pipeline clearly
When you want to schedule and monitor your ML workflows reliably
Commands
This Python script defines three pipeline components: preprocess, train, and evaluate. It then creates a pipeline that runs these components in order using a DAG structure. Finally, it compiles the pipeline to a YAML file for deployment.
Terminal
from kfp import dsl

@dsl.component
def preprocess_op():
    print('Preprocessing data')

@dsl.component
def train_op():
    print('Training model')

@dsl.component
def evaluate_op():
    print('Evaluating model')

@dsl.pipeline(name='ml-pipeline')
def ml_pipeline():
    preprocess = preprocess_op()
    train = train_op()
    train.after(preprocess)
    evaluate = evaluate_op()
    evaluate.after(train)

if __name__ == '__main__':
    import kfp
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
Expected OutputExpected
No output (command runs silently)
This command submits the compiled pipeline YAML to Kubeflow Pipelines to start a run named 'run1' under the experiment 'ml-experiment'. It triggers the execution of the DAG.
Terminal
kfp run submit --pipeline ml_pipeline.yaml --experiment ml-experiment --run-name run1
Expected OutputExpected
Run submitted successfully. Run ID: 12345 Pipeline: ml-pipeline Experiment: ml-experiment Run name: run1
--pipeline - Specifies the pipeline YAML file to run
--experiment - Names the experiment to group runs
--run-name - Gives a name to this specific run
This command lists all runs under the 'ml-experiment' experiment so you can check the status of your pipeline executions.
Terminal
kfp run list --experiment ml-experiment
Expected OutputExpected
Run ID Run Name Status Created At 12345 run1 Succeeded 2024-06-01 10:00:00
--experiment - Filters runs by experiment name
Key Concept

If you remember nothing else from this pattern, remember: pipeline components are tasks and DAGs define their order without loops.

Code Example
MLOps
from kfp import dsl

@dsl.component
def preprocess_op():
    print('Preprocessing data')

@dsl.component
def train_op():
    print('Training model')

@dsl.component
def evaluate_op():
    print('Evaluating model')

@dsl.pipeline(name='ml-pipeline')
def ml_pipeline():
    preprocess = preprocess_op()
    train = train_op()
    train.after(preprocess)
    evaluate = evaluate_op()
    evaluate.after(train)

if __name__ == '__main__':
    import kfp
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
    print('Pipeline compiled to ml_pipeline.yaml')
OutputSuccess
Common Mistakes
Not specifying the order of components in the pipeline
The tasks may run in any order or in parallel, causing errors or wrong results
Use the .after() method or set dependencies explicitly to define the execution order
Creating cycles in the DAG by making tasks depend on each other circularly
DAGs must not have loops; cycles cause the pipeline to fail or hang
Ensure dependencies form a directed acyclic graph with no loops
Summary
Define pipeline components as small tasks using @dsl.component decorator.
Create a pipeline function that connects components in order using .after() to form a DAG.
Compile the pipeline to a YAML file and submit it to run on Kubeflow Pipelines.
List runs to monitor pipeline execution status.