0
0
Apache Airflowdevops~10 mins

Creating custom operators in Apache Airflow - Step-by-Step CLI Walkthrough

Choose your learning style9 modes available
Introduction
Sometimes the built-in tasks in Airflow do not fit your specific needs. Creating custom operators lets you define your own task behavior to automate unique workflows easily.
When you need to run a task that is not covered by Airflow's default operators.
When you want to reuse a complex task logic across multiple workflows.
When you want to simplify your DAG code by encapsulating task details.
When you want to add custom logging or error handling to a task.
When integrating with a new system or API that Airflow does not support out of the box.
Config File - my_custom_operator.py
my_custom_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        print(f"Running custom operator with parameter: {self.my_param}")
        # Add your custom task logic here
        return f"Task completed with {self.my_param}"

This file defines a new operator class MyCustomOperator that inherits from Airflow's BaseOperator. The __init__ method sets up parameters, and the execute method contains the task's main logic. This operator can be imported and used in DAGs like any built-in operator.

Commands
Check the list of available DAGs to confirm Airflow is running and ready.
Terminal
airflow dags list
Expected OutputExpected
example_dag
Trigger a DAG that uses the custom operator to run the task immediately.
Terminal
airflow dags trigger example_custom_operator_dag
Expected OutputExpected
Created <DagRun example_custom_operator_dag @ 2024-06-01 12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the DAG to verify the custom operator task is present.
Terminal
airflow tasks list example_custom_operator_dag
Expected OutputExpected
my_custom_task
Run the custom operator task locally for the given execution date to test its behavior.
Terminal
airflow tasks test example_custom_operator_dag my_custom_task 2024-06-01
Expected OutputExpected
Running custom operator with parameter: test_value [2024-06-01 12:00:00,000] {taskinstance.py:XXXX} INFO - Task completed with test_value
Key Concept

If you remember nothing else from this pattern, remember: custom operators let you package unique task logic into reusable, clean components for your Airflow workflows.

Common Mistakes
Not calling super().__init__() in the custom operator's __init__ method
This causes Airflow to miss important setup steps, leading to errors or missing features.
Always call super().__init__(*args, **kwargs) inside your __init__ method.
Putting task logic outside the execute() method
Airflow runs the execute() method to perform the task; logic outside it won't run during task execution.
Put all task-specific code inside the execute() method.
Not importing the custom operator correctly in the DAG file
Airflow won't recognize the operator and will fail to parse the DAG.
Ensure the custom operator file is in the PYTHONPATH or the DAG folder and import it properly.
Summary
Create a Python class inheriting from BaseOperator to define a custom operator.
Implement the execute() method with the task's main logic.
Use airflow CLI commands to list DAGs, trigger runs, list tasks, and test tasks locally.