How to Create Custom Operator in Airflow: Simple Guide
To create a custom operator in Airflow, define a new Python class that inherits from
BaseOperator and implement the execute() method with your task logic. Then, use this operator in your DAG like any built-in operator.Syntax
Creating a custom operator involves subclassing BaseOperator and overriding the execute() method where the task's main logic runs. You can add an __init__() method to accept parameters and call super().__init__() to initialize the base class.
- class MyOperator(BaseOperator): defines the new operator class.
- def __init__(self, param1, **kwargs): initializes parameters.
- def execute(self, context): contains the task logic executed by Airflow.
python
from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults class MyOperator(BaseOperator): @apply_defaults def __init__(self, param1, **kwargs): super().__init__(**kwargs) self.param1 = param1 def execute(self, context): # Task logic here print(f"Running task with param1={self.param1}")
Example
This example shows a custom operator that prints a message with a parameter. It is used in a DAG to run as a task.
python
from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults class PrintOperator(BaseOperator): @apply_defaults def __init__(self, message, **kwargs): super().__init__(**kwargs) self.message = message def execute(self, context): print(f"Message: {self.message}") with DAG(dag_id='custom_operator_dag', start_date=days_ago(1), schedule_interval='@once') as dag: task = PrintOperator(task_id='print_task', message='Hello from custom operator!')
Output
Message: Hello from custom operator!
Common Pitfalls
- Forgetting to call
super().__init__(**kwargs)in__init__causes Airflow to miss important setup. - Not implementing
execute()method will cause errors because Airflow expects it. - Using print statements only logs to console; use Airflow logging for better tracking.
- Not passing
task_idwhen instantiating the operator will raise an error.
python
from airflow.models import BaseOperator # Wrong: Missing super().__init__ class BadOperator(BaseOperator): def __init__(self, param, **kwargs): self.param = param def execute(self, context): print(self.param) # Right: from airflow.utils.decorators import apply_defaults class GoodOperator(BaseOperator): @apply_defaults def __init__(self, param, **kwargs): super().__init__(**kwargs) self.param = param def execute(self, context): print(self.param)
Quick Reference
Remember these key points when creating custom operators:
- Subclass
BaseOperator. - Use
@apply_defaultsdecorator for__init__to handle default args. - Always call
super().__init__(**kwargs)in__init__. - Implement
execute(self, context)with task logic. - Pass
task_idwhen creating operator instances.
Key Takeaways
Create a custom operator by subclassing BaseOperator and overriding execute().
Always call super().__init__(**kwargs) in your operator's __init__ method.
Use @apply_defaults decorator to manage default arguments cleanly.
Pass task_id when instantiating your custom operator in a DAG.
Avoid using print; prefer Airflow's logging for better task logs.