0
0
AirflowHow-ToBeginner · 4 min read

How to Set dag_concurrency in Airflow for Task Control

Set dag_concurrency in the airflow.cfg file under the [core] section to limit the number of task instances running concurrently per DAG. Alternatively, you can set max_active_tasks parameter in your DAG definition to control concurrency per DAG.
📐

Syntax

The dag_concurrency setting controls how many task instances of a single DAG can run at the same time. It is set in the airflow.cfg configuration file under the [core] section.

Alternatively, in DAG code, you can control concurrency per DAG using the max_active_tasks parameter when creating the DAG object.

ini/python
[core]
dag_concurrency = 16

# In DAG definition
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

dag = DAG(
    'example_dag',
    start_date=datetime(2024, 1, 1),
    max_active_tasks=10
)

start = DummyOperator(task_id='start', dag=dag)
💻

Example

This example shows how to set dag_concurrency in airflow.cfg and how to limit concurrent tasks in a DAG using max_active_tasks. It demonstrates controlling task concurrency globally and per DAG.

ini/python
# airflow.cfg snippet
[core]
dag_concurrency = 5

# DAG file example
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    'limit_concurrency_dag',
    start_date=datetime(2024, 1, 1),
    max_active_tasks=3
)

t1 = BashOperator(task_id='task1', bash_command='echo Task 1', dag=dag)
t2 = BashOperator(task_id='task2', bash_command='echo Task 2', dag=dag)
t3 = BashOperator(task_id='task3', bash_command='echo Task 3', dag=dag)
t4 = BashOperator(task_id='task4', bash_command='echo Task 4', dag=dag)
t5 = BashOperator(task_id='task5', bash_command='echo Task 5', dag=dag)
Output
When running, Airflow will allow up to 5 concurrent tasks across all DAGs (due to airflow.cfg setting) but only 3 concurrent tasks for this DAG (due to max_active_tasks).
⚠️

Common Pitfalls

  • Setting dag_concurrency too high can overload your worker resources.
  • Confusing dag_concurrency with max_active_runs_per_dag which limits DAG runs, not tasks.
  • Forgetting to restart Airflow scheduler after changing airflow.cfg to apply new dag_concurrency value.
  • Not setting max_active_tasks in DAG code when you want DAG-specific concurrency control.
python
## Wrong: Setting dag_concurrency in DAG code (does nothing)
# This does NOT control concurrency

dag = DAG('wrong_dag', start_date=datetime(2024,1,1))
dag.dag_concurrency = 10  # Ignored

## Right: Use max_active_tasks parameter

dag = DAG('right_dag', start_date=datetime(2024,1,1), max_active_tasks=10)
📊

Quick Reference

SettingDescriptionLocationDefault
dag_concurrencyMax concurrent task instances per DAGairflow.cfg [core]16
max_active_tasksMax concurrent tasks per DAG (in DAG code)DAG definitionNone (unlimited)
max_active_runs_per_dagMax concurrent DAG runsairflow.cfg or DAG code16

Key Takeaways

Set dag_concurrency in airflow.cfg under [core] to limit concurrent tasks per DAG globally.
Use max_active_tasks in DAG code to control concurrency per DAG specifically.
Restart Airflow scheduler after changing airflow.cfg for settings to take effect.
Do not confuse dag_concurrency with max_active_runs_per_dag; they control different limits.
Setting concurrency too high can overload your Airflow workers.