How to Use Prefect for ML Pipeline: Simple Guide
Use
Prefect to create and manage ML pipelines by defining tasks with @task and combining them into flows with @flow. Prefect handles execution, retries, and scheduling, making your ML workflow reliable and easy to monitor.Syntax
Prefect pipelines are built using two main decorators: @task to define individual steps, and @flow to group tasks into a pipeline. Tasks are Python functions that do work like data loading or model training. Flows run these tasks in order and manage retries and dependencies.
- @task: Marks a function as a task.
- @flow: Defines the pipeline that runs tasks.
- task.call(): Runs a task and returns its result.
python
from prefect import task, flow @task def load_data(): return [1, 2, 3, 4, 5] @task def train_model(data): return sum(data) / len(data) @flow def ml_pipeline(): data = load_data() model = train_model(data) print(f"Model result: {model}") if __name__ == "__main__": ml_pipeline()
Output
Model result: 3.0
Example
This example shows a simple ML pipeline using Prefect. It loads data, trains a model by calculating the average, and prints the result. Prefect manages the flow execution and task dependencies automatically.
python
from prefect import task, flow @task def load_data(): print("Loading data...") return [10, 20, 30, 40, 50] @task def preprocess_data(data): print("Preprocessing data...") return [x / 10 for x in data] @task def train_model(data): print("Training model...") return sum(data) / len(data) @flow def ml_pipeline(): data = load_data() processed = preprocess_data(data) model_result = train_model(processed) print(f"Trained model output: {model_result}") if __name__ == "__main__": ml_pipeline()
Output
Loading data...
Preprocessing data...
Training model...
Trained model output: 3.0
Common Pitfalls
Common mistakes when using Prefect for ML pipelines include:
- Not decorating functions with
@task, so Prefect cannot manage them. - Calling tasks like normal functions instead of letting Prefect handle execution.
- Ignoring task dependencies, causing tasks to run out of order.
- Not handling exceptions or retries, which can cause pipeline failures.
Always use @task and @flow decorators and call tasks directly inside flows.
python
from prefect import task, flow # Wrong: missing @task decorator def load_data(): return [1, 2, 3] @task def train_model(data): return sum(data) @flow def ml_pipeline(): data = load_data() # This runs immediately, not as a task result = train_model(data) print(result) if __name__ == "__main__": ml_pipeline() # Right way: from prefect import task, flow @task def load_data(): return [1, 2, 3] @task def train_model(data): return sum(data) @flow def ml_pipeline(): data = load_data() result = train_model(data) print(result) if __name__ == "__main__": ml_pipeline()
Quick Reference
Here is a quick cheat sheet for using Prefect in ML pipelines:
| Concept | Description | Example |
|---|---|---|
| @task | Decorator to mark a function as a Prefect task | @task\ndef load_data(): ... |
| @flow | Decorator to define a pipeline that runs tasks | @flow\ndef pipeline(): ... |
| Task call | Call a task inside a flow to run it | data = load_data() |
| Retries | Configure task retries for failures | @task(retries=3) |
| Parameters | Pass inputs to flows and tasks | @flow\ndef pipeline(param): ... |
Key Takeaways
Use @task to mark functions as Prefect tasks for pipeline steps.
Group tasks inside a @flow function to create the ML pipeline.
Call tasks inside flows to let Prefect manage execution and dependencies.
Handle retries and failures with Prefect task options for reliability.
Test pipelines locally before deploying to Prefect Cloud or server.