0
0
Ml-pythonHow-ToBeginner ยท 4 min read

How to Use Prefect for ML Pipeline: Simple Guide

Use Prefect to create and manage ML pipelines by defining tasks with @task and combining them into flows with @flow. Prefect handles execution, retries, and scheduling, making your ML workflow reliable and easy to monitor.
๐Ÿ“

Syntax

Prefect pipelines are built using two main decorators: @task to define individual steps, and @flow to group tasks into a pipeline. Tasks are Python functions that do work like data loading or model training. Flows run these tasks in order and manage retries and dependencies.

  • @task: Marks a function as a task.
  • @flow: Defines the pipeline that runs tasks.
  • task.call(): Runs a task and returns its result.
python
from prefect import task, flow

@task
def load_data():
    return [1, 2, 3, 4, 5]

@task
def train_model(data):
    return sum(data) / len(data)

@flow
def ml_pipeline():
    data = load_data()
    model = train_model(data)
    print(f"Model result: {model}")

if __name__ == "__main__":
    ml_pipeline()
Output
Model result: 3.0
๐Ÿ’ป

Example

This example shows a simple ML pipeline using Prefect. It loads data, trains a model by calculating the average, and prints the result. Prefect manages the flow execution and task dependencies automatically.

python
from prefect import task, flow

@task
def load_data():
    print("Loading data...")
    return [10, 20, 30, 40, 50]

@task
def preprocess_data(data):
    print("Preprocessing data...")
    return [x / 10 for x in data]

@task
def train_model(data):
    print("Training model...")
    return sum(data) / len(data)

@flow
def ml_pipeline():
    data = load_data()
    processed = preprocess_data(data)
    model_result = train_model(processed)
    print(f"Trained model output: {model_result}")

if __name__ == "__main__":
    ml_pipeline()
Output
Loading data... Preprocessing data... Training model... Trained model output: 3.0
โš ๏ธ

Common Pitfalls

Common mistakes when using Prefect for ML pipelines include:

  • Not decorating functions with @task, so Prefect cannot manage them.
  • Calling tasks like normal functions instead of letting Prefect handle execution.
  • Ignoring task dependencies, causing tasks to run out of order.
  • Not handling exceptions or retries, which can cause pipeline failures.

Always use @task and @flow decorators and call tasks directly inside flows.

python
from prefect import task, flow

# Wrong: missing @task decorator

def load_data():
    return [1, 2, 3]

@task
def train_model(data):
    return sum(data)

@flow
def ml_pipeline():
    data = load_data()  # This runs immediately, not as a task
    result = train_model(data)
    print(result)

if __name__ == "__main__":
    ml_pipeline()

# Right way:

from prefect import task, flow

@task
def load_data():
    return [1, 2, 3]

@task
def train_model(data):
    return sum(data)

@flow
def ml_pipeline():
    data = load_data()
    result = train_model(data)
    print(result)

if __name__ == "__main__":
    ml_pipeline()
๐Ÿ“Š

Quick Reference

Here is a quick cheat sheet for using Prefect in ML pipelines:

ConceptDescriptionExample
@taskDecorator to mark a function as a Prefect task@task\ndef load_data(): ...
@flowDecorator to define a pipeline that runs tasks@flow\ndef pipeline(): ...
Task callCall a task inside a flow to run itdata = load_data()
RetriesConfigure task retries for failures@task(retries=3)
ParametersPass inputs to flows and tasks@flow\ndef pipeline(param): ...
โœ…

Key Takeaways

Use @task to mark functions as Prefect tasks for pipeline steps.
Group tasks inside a @flow function to create the ML pipeline.
Call tasks inside flows to let Prefect manage execution and dependencies.
Handle retries and failures with Prefect task options for reliability.
Test pipelines locally before deploying to Prefect Cloud or server.