0
0
AirflowHow-ToBeginner · 4 min read

How to Create a Custom Sensor in Apache Airflow

To create a custom sensor in Apache Airflow, subclass BaseSensorOperator and implement the poke() method with your condition logic. Then use this custom sensor in your DAG to wait for specific events or states before proceeding.
📐

Syntax

Creating a custom sensor involves subclassing BaseSensorOperator and overriding the poke() method. The poke() method should return True when the sensor condition is met, otherwise False. The sensor will keep checking at intervals defined by poke_interval.

  • BaseSensorOperator: The base class for all sensors.
  • poke(self, context): Method to check the condition; must return a boolean.
  • poke_interval: Time in seconds between checks.
  • timeout: Maximum time in seconds to wait before failing.
python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class CustomSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.param = param

    def poke(self, context):
        # Your condition logic here
        if self.param == 'expected_value':
            return True
        return False
💻

Example

This example shows a custom sensor that waits until a file exists at a given path. It checks every 10 seconds and times out after 60 seconds.

python
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class FileExistenceSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, filepath, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath

    def poke(self, context):
        return os.path.exists(self.filepath)

with DAG(dag_id='file_sensor_dag', start_date=days_ago(1), schedule_interval=None) as dag:
    wait_for_file = FileExistenceSensor(
        task_id='wait_for_file',
        filepath='/tmp/testfile.txt',
        poke_interval=10,
        timeout=60
    )
Output
The DAG will pause at the 'wait_for_file' task until '/tmp/testfile.txt' exists or timeout occurs after 60 seconds.
⚠️

Common Pitfalls

  • Not returning a boolean from poke() causes errors; always return True or False.
  • Setting poke_interval too low can overload the scheduler with frequent checks.
  • For long waits, ensure timeout is set to avoid infinite hanging.
  • Remember to handle exceptions inside poke() to prevent sensor failure.
python
class BadSensor(BaseSensorOperator):
    def poke(self, context):
        # Wrong: returns None instead of boolean
        pass

class GoodSensor(BaseSensorOperator):
    def poke(self, context):
        # Correct: returns boolean
        return True
📊

Quick Reference

ParameterDescription
poke()Method to check the sensor condition; must return True or False
poke_intervalSeconds between each poke call (default 60)
timeoutMaximum seconds to wait before failing the sensor
BaseSensorOperatorBase class to inherit when creating custom sensors
apply_defaultsDecorator to handle operator arguments cleanly

Key Takeaways

Subclass BaseSensorOperator and override poke() to create a custom sensor.
poke() must return True when the condition is met, otherwise False.
Set poke_interval and timeout to control sensor check frequency and max wait time.
Handle exceptions inside poke() to avoid sensor crashes.
Use custom sensors in DAGs to wait for specific external conditions.