How to Create a Custom Sensor in Apache Airflow
To create a custom sensor in Apache Airflow, subclass
BaseSensorOperator and implement the poke() method with your condition logic. Then use this custom sensor in your DAG to wait for specific events or states before proceeding.Syntax
Creating a custom sensor involves subclassing BaseSensorOperator and overriding the poke() method. The poke() method should return True when the sensor condition is met, otherwise False. The sensor will keep checking at intervals defined by poke_interval.
- BaseSensorOperator: The base class for all sensors.
- poke(self, context): Method to check the condition; must return a boolean.
- poke_interval: Time in seconds between checks.
- timeout: Maximum time in seconds to wait before failing.
python
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class CustomSensor(BaseSensorOperator): @apply_defaults def __init__(self, param, *args, **kwargs): super().__init__(*args, **kwargs) self.param = param def poke(self, context): # Your condition logic here if self.param == 'expected_value': return True return False
Example
This example shows a custom sensor that waits until a file exists at a given path. It checks every 10 seconds and times out after 60 seconds.
python
import os from airflow import DAG from airflow.utils.dates import days_ago from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class FileExistenceSensor(BaseSensorOperator): @apply_defaults def __init__(self, filepath, *args, **kwargs): super().__init__(*args, **kwargs) self.filepath = filepath def poke(self, context): return os.path.exists(self.filepath) with DAG(dag_id='file_sensor_dag', start_date=days_ago(1), schedule_interval=None) as dag: wait_for_file = FileExistenceSensor( task_id='wait_for_file', filepath='/tmp/testfile.txt', poke_interval=10, timeout=60 )
Output
The DAG will pause at the 'wait_for_file' task until '/tmp/testfile.txt' exists or timeout occurs after 60 seconds.
Common Pitfalls
- Not returning a boolean from
poke()causes errors; always returnTrueorFalse. - Setting
poke_intervaltoo low can overload the scheduler with frequent checks. - For long waits, ensure
timeoutis set to avoid infinite hanging. - Remember to handle exceptions inside
poke()to prevent sensor failure.
python
class BadSensor(BaseSensorOperator): def poke(self, context): # Wrong: returns None instead of boolean pass class GoodSensor(BaseSensorOperator): def poke(self, context): # Correct: returns boolean return True
Quick Reference
| Parameter | Description |
|---|---|
| poke() | Method to check the sensor condition; must return True or False |
| poke_interval | Seconds between each poke call (default 60) |
| timeout | Maximum seconds to wait before failing the sensor |
| BaseSensorOperator | Base class to inherit when creating custom sensors |
| apply_defaults | Decorator to handle operator arguments cleanly |
Key Takeaways
Subclass BaseSensorOperator and override poke() to create a custom sensor.
poke() must return True when the condition is met, otherwise False.
Set poke_interval and timeout to control sensor check frequency and max wait time.
Handle exceptions inside poke() to avoid sensor crashes.
Use custom sensors in DAGs to wait for specific external conditions.