How to Set dag_concurrency in Airflow for Task Control
Set
dag_concurrency in the airflow.cfg file under the [core] section to limit the number of task instances running concurrently per DAG. Alternatively, you can set max_active_tasks parameter in your DAG definition to control concurrency per DAG.Syntax
The dag_concurrency setting controls how many task instances of a single DAG can run at the same time. It is set in the airflow.cfg configuration file under the [core] section.
Alternatively, in DAG code, you can control concurrency per DAG using the max_active_tasks parameter when creating the DAG object.
ini/python
[core] dag_concurrency = 16 # In DAG definition from airflow import DAG from airflow.operators.dummy import DummyOperator from datetime import datetime dag = DAG( 'example_dag', start_date=datetime(2024, 1, 1), max_active_tasks=10 ) start = DummyOperator(task_id='start', dag=dag)
Example
This example shows how to set dag_concurrency in airflow.cfg and how to limit concurrent tasks in a DAG using max_active_tasks. It demonstrates controlling task concurrency globally and per DAG.
ini/python
# airflow.cfg snippet [core] dag_concurrency = 5 # DAG file example from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( 'limit_concurrency_dag', start_date=datetime(2024, 1, 1), max_active_tasks=3 ) t1 = BashOperator(task_id='task1', bash_command='echo Task 1', dag=dag) t2 = BashOperator(task_id='task2', bash_command='echo Task 2', dag=dag) t3 = BashOperator(task_id='task3', bash_command='echo Task 3', dag=dag) t4 = BashOperator(task_id='task4', bash_command='echo Task 4', dag=dag) t5 = BashOperator(task_id='task5', bash_command='echo Task 5', dag=dag)
Output
When running, Airflow will allow up to 5 concurrent tasks across all DAGs (due to airflow.cfg setting) but only 3 concurrent tasks for this DAG (due to max_active_tasks).
Common Pitfalls
- Setting
dag_concurrencytoo high can overload your worker resources. - Confusing
dag_concurrencywithmax_active_runs_per_dagwhich limits DAG runs, not tasks. - Forgetting to restart Airflow scheduler after changing
airflow.cfgto apply newdag_concurrencyvalue. - Not setting
max_active_tasksin DAG code when you want DAG-specific concurrency control.
python
## Wrong: Setting dag_concurrency in DAG code (does nothing) # This does NOT control concurrency dag = DAG('wrong_dag', start_date=datetime(2024,1,1)) dag.dag_concurrency = 10 # Ignored ## Right: Use max_active_tasks parameter dag = DAG('right_dag', start_date=datetime(2024,1,1), max_active_tasks=10)
Quick Reference
| Setting | Description | Location | Default |
|---|---|---|---|
| dag_concurrency | Max concurrent task instances per DAG | airflow.cfg [core] | 16 |
| max_active_tasks | Max concurrent tasks per DAG (in DAG code) | DAG definition | None (unlimited) |
| max_active_runs_per_dag | Max concurrent DAG runs | airflow.cfg or DAG code | 16 |
Key Takeaways
Set dag_concurrency in airflow.cfg under [core] to limit concurrent tasks per DAG globally.
Use max_active_tasks in DAG code to control concurrency per DAG specifically.
Restart Airflow scheduler after changing airflow.cfg for settings to take effect.
Do not confuse dag_concurrency with max_active_runs_per_dag; they control different limits.
Setting concurrency too high can overload your Airflow workers.