0
0
AirflowHow-ToBeginner · 4 min read

How to Create Custom Operator in Airflow: Simple Guide

To create a custom operator in Airflow, define a new Python class that inherits from BaseOperator and implement the execute() method with your task logic. Then, use this operator in your DAG like any built-in operator.
📐

Syntax

Creating a custom operator involves subclassing BaseOperator and overriding the execute() method where the task's main logic runs. You can add an __init__() method to accept parameters and call super().__init__() to initialize the base class.

  • class MyOperator(BaseOperator): defines the new operator class.
  • def __init__(self, param1, **kwargs): initializes parameters.
  • def execute(self, context): contains the task logic executed by Airflow.
python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyOperator(BaseOperator):
    @apply_defaults
    def __init__(self, param1, **kwargs):
        super().__init__(**kwargs)
        self.param1 = param1

    def execute(self, context):
        # Task logic here
        print(f"Running task with param1={self.param1}")
💻

Example

This example shows a custom operator that prints a message with a parameter. It is used in a DAG to run as a task.

python
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class PrintOperator(BaseOperator):
    @apply_defaults
    def __init__(self, message, **kwargs):
        super().__init__(**kwargs)
        self.message = message

    def execute(self, context):
        print(f"Message: {self.message}")

with DAG(dag_id='custom_operator_dag', start_date=days_ago(1), schedule_interval='@once') as dag:
    task = PrintOperator(task_id='print_task', message='Hello from custom operator!')
Output
Message: Hello from custom operator!
⚠️

Common Pitfalls

  • Forgetting to call super().__init__(**kwargs) in __init__ causes Airflow to miss important setup.
  • Not implementing execute() method will cause errors because Airflow expects it.
  • Using print statements only logs to console; use Airflow logging for better tracking.
  • Not passing task_id when instantiating the operator will raise an error.
python
from airflow.models import BaseOperator

# Wrong: Missing super().__init__
class BadOperator(BaseOperator):
    def __init__(self, param, **kwargs):
        self.param = param

    def execute(self, context):
        print(self.param)

# Right:
from airflow.utils.decorators import apply_defaults

class GoodOperator(BaseOperator):
    @apply_defaults
    def __init__(self, param, **kwargs):
        super().__init__(**kwargs)
        self.param = param

    def execute(self, context):
        print(self.param)
📊

Quick Reference

Remember these key points when creating custom operators:

  • Subclass BaseOperator.
  • Use @apply_defaults decorator for __init__ to handle default args.
  • Always call super().__init__(**kwargs) in __init__.
  • Implement execute(self, context) with task logic.
  • Pass task_id when creating operator instances.

Key Takeaways

Create a custom operator by subclassing BaseOperator and overriding execute().
Always call super().__init__(**kwargs) in your operator's __init__ method.
Use @apply_defaults decorator to manage default arguments cleanly.
Pass task_id when instantiating your custom operator in a DAG.
Avoid using print; prefer Airflow's logging for better task logs.