0
0
Apache Airflowdevops~7 mins

ExternalTaskSensor for cross-DAG dependencies in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes you want one workflow to wait for another workflow to finish before starting. ExternalTaskSensor helps you do this by pausing a task until a task in a different workflow completes.
When you have two separate workflows and one needs to start only after the other finishes successfully.
When you want to coordinate data processing steps that happen in different DAGs.
When you want to avoid running a task before its required data is ready from another workflow.
When you want to build modular workflows that depend on each other without merging them into one big DAG.
Config File - cross_dag_sensor.py
cross_dag_sensor.py
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='dependent_dag',
    start_date=datetime(2024, 6, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:

    wait_for_task = ExternalTaskSensor(
        task_id='wait_for_task_in_other_dag',
        external_dag_id='source_dag',
        external_task_id='final_task',
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        mode='reschedule',
        poke_interval=30,
        timeout=600
    )

    proceed = DummyOperator(task_id='proceed_after_wait')

    wait_for_task >> proceed

This DAG named dependent_dag waits for a task called final_task in another DAG named source_dag to complete successfully before continuing.

ExternalTaskSensor pauses the workflow until the external task finishes with success.

mode='reschedule' lets Airflow free up worker slots while waiting.

poke_interval=30 means it checks every 30 seconds.

timeout=600 means it stops waiting after 10 minutes if the task never finishes.

Commands
List all available DAGs to confirm both source_dag and dependent_dag are present.
Terminal
airflow dags list
Expected OutputExpected
source_dag dependent_dag
Start the source DAG to run its tasks, including the final_task that dependent_dag waits for.
Terminal
airflow dags trigger source_dag
Expected OutputExpected
Created <DagRun source_dag @ 2024-06-01T00:00:00+00:00: manual__2024-06-01T00:00:00+00:00, externally triggered: True>
List tasks in dependent_dag to verify the ExternalTaskSensor task and the next task.
Terminal
airflow tasks list dependent_dag
Expected OutputExpected
wait_for_task_in_other_dag proceed_after_wait
Test the ExternalTaskSensor task manually for the given date to see if it detects the external task completion.
Terminal
airflow tasks test dependent_dag wait_for_task_in_other_dag 2024-06-01
Expected OutputExpected
[2024-06-01 00:00:00,000] {external_task.py:100} INFO - Poking for task final_task in DAG source_dag [2024-06-01 00:00:30,000] {external_task.py:100} INFO - External task final_task in DAG source_dag succeeded [2024-06-01 00:00:30,001] {taskinstance.py:1234} INFO - Task succeeded
Key Concept

If you remember nothing else from this pattern, remember: ExternalTaskSensor pauses a task until a specific task in another DAG finishes successfully.

Common Mistakes
Using ExternalTaskSensor with wrong external_dag_id or external_task_id.
The sensor will never find the task to wait for and will timeout or fail.
Double-check the exact DAG ID and task ID names in the other DAG before configuring the sensor.
Not setting mode='reschedule' for ExternalTaskSensor.
The sensor will occupy a worker slot the whole time it waits, reducing resource efficiency.
Always use mode='reschedule' to free up workers while waiting.
Not handling failed_states in ExternalTaskSensor.
If the external task fails or is skipped, the sensor might wait forever or behave unexpectedly.
Set failed_states=['failed', 'skipped'] to make the sensor fail fast if the external task does not succeed.
Summary
Use ExternalTaskSensor in one DAG to wait for a task in another DAG to finish.
Configure external_dag_id and external_task_id to point to the correct external workflow and task.
Set mode='reschedule' and handle allowed_states and failed_states for efficient and safe waiting.