0
0
AirflowHow-ToBeginner · 3 min read

How to Use cross_downstream in Airflow for Task Dependencies

In Airflow, cross_downstream is used to set dependencies from multiple upstream tasks to multiple downstream tasks in a cross-product manner. It connects each upstream task to every downstream task, simplifying complex DAG dependency setups.
📐

Syntax

The cross_downstream method is called on a list of upstream tasks and takes a list of downstream tasks as an argument. It sets dependencies so that each upstream task runs before every downstream task.

  • upstream_tasks: List of tasks that should run first.
  • downstream_tasks: List of tasks that should run after all upstream tasks.
python
upstream_tasks.cross_downstream(downstream_tasks)
💻

Example

This example shows how to use cross_downstream to connect two upstream tasks to two downstream tasks. Each upstream task will run before both downstream tasks.

python
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('cross_downstream_example', default_args=default_args, schedule_interval=None)

# Define upstream tasks
upstream_tasks = [DummyOperator(task_id='upstream_1', dag=dag),
                  DummyOperator(task_id='upstream_2', dag=dag)]

# Define downstream tasks
downstream_tasks = [DummyOperator(task_id='downstream_1', dag=dag),
                    DummyOperator(task_id='downstream_2', dag=dag)]

# Set cross dependencies
upstream_tasks.cross_downstream(downstream_tasks)
Output
DAG 'cross_downstream_example' created with tasks: - upstream_1 - upstream_2 - downstream_1 - downstream_2 Dependencies: upstream_1 -> downstream_1 upstream_1 -> downstream_2 upstream_2 -> downstream_1 upstream_2 -> downstream_2
⚠️

Common Pitfalls

Common mistakes when using cross_downstream include:

  • Passing single tasks instead of lists, which causes errors because cross_downstream expects lists.
  • Confusing cross_downstream with set_downstream, which only sets one-to-many dependencies.
  • Not importing the correct operators or using outdated Airflow versions that do not support this method.
python
from airflow.operators.dummy import DummyOperator

# Wrong: passing single tasks instead of lists
upstream_task = DummyOperator(task_id='upstream', dag=dag)
downstream_task = DummyOperator(task_id='downstream', dag=dag)

# This will raise an AttributeError
# upstream_task.cross_downstream(downstream_task)  # Wrong usage

# Correct usage with lists
upstream_tasks = [upstream_task]
downstream_tasks = [downstream_task]
upstream_tasks.cross_downstream(downstream_tasks)
📊

Quick Reference

MethodDescriptionUsage Example
cross_downstreamSets dependencies from each upstream task to every downstream taskupstream_tasks.cross_downstream(downstream_tasks)
set_downstreamSets dependency from one task to anothertask1.set_downstream(task2)
set_upstreamSets dependency from one task to another (reverse)task2.set_upstream(task1)

Key Takeaways

Use cross_downstream on lists of tasks to connect all upstream tasks to all downstream tasks.
Always pass lists of tasks, not single tasks, to cross_downstream to avoid errors.
cross_downstream simplifies setting many-to-many dependencies in Airflow DAGs.
Make sure your Airflow version supports cross_downstream (Airflow 2.x and later).
For one-to-one dependencies, use set_downstream or set_upstream instead.