How to Use xcom_pull in Airflow: Syntax, Example, and Tips
In Airflow, use
xcom_pull to retrieve data pushed by other tasks via XComs. Call task_instance.xcom_pull(task_ids='task_id') inside a task to get the value shared by that task. This helps tasks communicate and share small data during workflow execution.Syntax
The xcom_pull method is used inside a task's Python callable or operator to fetch data shared by another task. The main parts are:
task_ids: The ID of the task you want to pull data from.key(optional): The specific key of the data to pull; defaults toreturn_value.dag_id(optional): The DAG ID if pulling from a different DAG.include_prior_dates(optional): Whether to include XComs from previous DAG runs.
Example syntax:
value = task_instance.xcom_pull(task_ids='task_id', key='key_name')
python
value = task_instance.xcom_pull(task_ids='task_id', key='key_name')
Example
This example shows two PythonOperator tasks: one pushes a value using xcom_push, and the other pulls it using xcom_pull. It demonstrates how to share data between tasks in the same DAG.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_function(ti): ti.xcom_push(key='sample_key', value='Hello from push task') def pull_function(ti): pulled_value = ti.xcom_pull(task_ids='push_task', key='sample_key') print(f'Pulled value: {pulled_value}') default_args = {'start_date': datetime(2024, 1, 1)} dag = DAG('xcom_example_dag', default_args=default_args, schedule_interval=None) push_task = PythonOperator( task_id='push_task', python_callable=push_function, dag=dag ) pull_task = PythonOperator( task_id='pull_task', python_callable=pull_function, dag=dag ) push_task >> pull_task
Output
Pulled value: Hello from push task
Common Pitfalls
Common mistakes when using xcom_pull include:
- Not specifying the correct
task_ids, resulting inNonevalues. - Forgetting to push data with
xcom_pushbefore pulling. - Using
xcom_pulloutside task context wheretask_instanceis not available. - Pulling large data via XCom, which is not recommended as XComs are meant for small messages.
Example of a wrong and right way:
# Wrong: Missing task_ids value = task_instance.xcom_pull() # Right: Specify task_ids value = task_instance.xcom_pull(task_ids='push_task')
python
value_wrong = task_instance.xcom_pull()
value_right = task_instance.xcom_pull(task_ids='push_task')Quick Reference
| Parameter | Description | Default |
|---|---|---|
| task_ids | ID of the task to pull XCom from | None (required) |
| key | Key of the XCom value to pull | 'return_value' |
| dag_id | DAG ID if pulling from another DAG | Current DAG |
| include_prior_dates | Include XComs from previous DAG runs | False |
Key Takeaways
Use task_instance.xcom_pull(task_ids='task_id') inside a task to get data from another task.
Always push data first with xcom_push before pulling it.
Specify the correct task_id and key to avoid getting None.
XComs are for small data; avoid pushing large objects.
xcom_pull must be called within a task context where task_instance is available.