0
0
Apache Airflowdevops~5 mins

PythonOperator for custom logic in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes you need to run your own Python code as part of a workflow. PythonOperator lets you run custom Python functions inside Airflow tasks to automate any logic you want.
When you want to run a Python function that processes data as part of your workflow.
When you need to call an API or perform calculations inside a task.
When you want to reuse existing Python code in your Airflow pipeline.
When you want to run simple scripts without creating separate executables.
When you want to chain multiple Python tasks with dependencies.
Config File - example_dag.py
example_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def greet():
    print('Hello from PythonOperator!')

def add_numbers(a, b):
    print(f'Sum is {a + b}')

def my_dag():
    with DAG(dag_id='custom_python_operator_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
        task1 = PythonOperator(
            task_id='greet_task',
            python_callable=greet
        )
        task2 = PythonOperator(
            task_id='add_task',
            python_callable=add_numbers,
            op_kwargs={'a': 5, 'b': 7}
        )
        task1 >> task2
    return dag

dag = my_dag()

This file defines an Airflow DAG named custom_python_operator_dag that runs daily starting from January 1, 2024.

It has two tasks using PythonOperator:

  • greet_task: runs the greet function which prints a greeting.
  • add_task: runs add_numbers with arguments 5 and 7, printing their sum.

The tasks run in order: greet_task first, then add_task.

Commands
List all available DAGs to verify that your DAG file is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
custom_python_operator_dag example_other_dag
Manually trigger the DAG to run your PythonOperator tasks immediately.
Terminal
airflow dags trigger custom_python_operator_dag
Expected OutputExpected
Created <DagRun custom_python_operator_dag @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the DAG to see the task IDs you can monitor or run individually.
Terminal
airflow tasks list custom_python_operator_dag
Expected OutputExpected
greet_task add_task
Run the greet_task for the given date to test the Python function without running the whole DAG.
Terminal
airflow tasks test custom_python_operator_dag greet_task 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:1234} INFO - Hello from PythonOperator! [2024-06-01 12:00:00,001] {taskinstance.py:1235} INFO - Task greet_task succeeded
Key Concept

If you remember nothing else from this pattern, remember: PythonOperator lets you run any Python function as a task inside Airflow workflows.

Common Mistakes
Passing the function call instead of the function itself to python_callable (e.g., python_callable=greet()).
This runs the function immediately when the DAG is parsed, not when the task runs, causing errors or unexpected behavior.
Pass the function name without parentheses: python_callable=greet.
Not setting op_kwargs or op_args when the Python function requires parameters.
The task will fail because the function expects arguments but none are provided.
Use op_kwargs={'param': value} or op_args=[value] to pass arguments to the function.
Not setting catchup=False in the DAG when testing manually.
Airflow tries to run all past scheduled runs, which can be confusing during development.
Set catchup=False in the DAG definition to avoid running old schedules.
Summary
Define Python functions with the logic you want to run as tasks.
Use PythonOperator in your DAG to run these functions as tasks.
Trigger and test tasks using Airflow CLI commands to verify behavior.