Complete the code to import the ExternalTaskSensor from Airflow sensors.
from airflow.sensors.external_task import [1]
The correct import is ExternalTaskSensor from airflow.sensors.external_task.
Complete the ExternalTaskSensor to wait for a task named 'task_b' in DAG 'dag_b'.
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task_b',
external_dag_id='[1]',
external_task_id='task_b',
mode='reschedule'
)external_dag_id with external_task_id.The external_dag_id should be the DAG where the task to wait for lives, which is 'dag_b'.
Fix the error in the ExternalTaskSensor mode parameter to avoid blocking the worker.
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task_c',
external_dag_id='dag_c',
external_task_id='task_c',
mode='[1]'
)Using mode='reschedule' allows the sensor to free the worker slot while waiting, preventing blocking.
Fill both blanks to create an ExternalTaskSensor that waits for 'task_x' in 'dag_x' with a 600 seconds timeout.
wait_task = ExternalTaskSensor(
task_id='wait_task_x',
external_dag_id='[1]',
external_task_id='[2]',
timeout=600
)The sensor waits for task_x in dag_x, so those values fill the blanks.
Fill all three blanks to create a dictionary comprehension that maps task ids to their ExternalTaskSensor instances for tasks 'task1' and 'task2' in 'dag_main'.
sensors = {
'[1]': ExternalTaskSensor(
task_id='wait_[1]',
external_dag_id='[2]',
external_task_id='[1]'
)
for [3] in ['task1', 'task2']
}The variable name for iteration is 'task' (option D), the DAG id is 'dag_main' (option B), and the dictionary keys and sensor task ids use the iteration variable 'task'.