How to Use PythonOperator in Airflow: Syntax and Example
Use
PythonOperator in Airflow to run Python functions as tasks in your DAG. Define a Python function, then create a PythonOperator with the function assigned to the python_callable parameter.Syntax
The PythonOperator requires a few key parameters:
- task_id: A unique identifier for the task.
- python_callable: The Python function to execute.
- op_args (optional): List of positional arguments for the function.
- op_kwargs (optional): Dictionary of keyword arguments for the function.
- dag: The DAG object this task belongs to.
This operator runs the Python function when the task executes.
python
from airflow.operators.python import PythonOperator task = PythonOperator( task_id='task_name', python_callable=your_function, op_args=[arg1, arg2], op_kwargs={'key': 'value'}, dag=dag )
Example
This example shows a simple DAG with a PythonOperator that prints a greeting message.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def greet(name): print(f"Hello, {name}!") default_args = { 'start_date': datetime(2024, 1, 1) } dag = DAG('greet_dag', default_args=default_args, schedule_interval='@daily') greet_task = PythonOperator( task_id='say_hello', python_callable=greet, op_args=['Airflow'], dag=dag )
Output
Hello, Airflow!
Common Pitfalls
Common mistakes when using PythonOperator include:
- Not passing the DAG object to the operator, causing the task not to register.
- Using functions that require arguments but not providing
op_argsorop_kwargs. - Writing functions with side effects that are not idempotent, which can cause issues on retries.
- Trying to return complex objects instead of simple data or using XComs properly.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def add(a, b): return a + b default_args = {'start_date': datetime(2024, 1, 1)} dag = DAG('example_dag', default_args=default_args, schedule_interval='@daily') # Wrong: missing op_args for function that needs arguments wrong_task = PythonOperator( task_id='wrong_add', python_callable=add, dag=dag ) # Right: provide op_args right_task = PythonOperator( task_id='right_add', python_callable=add, op_args=[5, 3], dag=dag )
Quick Reference
Remember these tips when using PythonOperator:
- Always assign a unique
task_id. - Pass your Python function to
python_callable. - Use
op_argsandop_kwargsto pass parameters. - Include the
dagparameter to link the task to your DAG. - Keep functions simple and idempotent for retries.
Key Takeaways
Use PythonOperator to run Python functions as Airflow tasks by setting python_callable.
Always provide the DAG object and a unique task_id when creating the operator.
Pass function arguments with op_args (positional) or op_kwargs (keyword).
Avoid side effects in your Python functions to ensure safe retries.
Check for missing arguments to prevent task failures.