0
0
AirflowHow-ToBeginner · 3 min read

How to Use xcom_pull in Airflow: Syntax, Example, and Tips

In Airflow, use xcom_pull to retrieve data pushed by other tasks via XComs. Call task_instance.xcom_pull(task_ids='task_id') inside a task to get the value shared by that task. This helps tasks communicate and share small data during workflow execution.
📐

Syntax

The xcom_pull method is used inside a task's Python callable or operator to fetch data shared by another task. The main parts are:

  • task_ids: The ID of the task you want to pull data from.
  • key (optional): The specific key of the data to pull; defaults to return_value.
  • dag_id (optional): The DAG ID if pulling from a different DAG.
  • include_prior_dates (optional): Whether to include XComs from previous DAG runs.

Example syntax:

value = task_instance.xcom_pull(task_ids='task_id', key='key_name')
python
value = task_instance.xcom_pull(task_ids='task_id', key='key_name')
💻

Example

This example shows two PythonOperator tasks: one pushes a value using xcom_push, and the other pulls it using xcom_pull. It demonstrates how to share data between tasks in the same DAG.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_function(ti):
    ti.xcom_push(key='sample_key', value='Hello from push task')

def pull_function(ti):
    pulled_value = ti.xcom_pull(task_ids='push_task', key='sample_key')
    print(f'Pulled value: {pulled_value}')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('xcom_example_dag', default_args=default_args, schedule_interval=None)

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    dag=dag
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    dag=dag
)

push_task >> pull_task
Output
Pulled value: Hello from push task
⚠️

Common Pitfalls

Common mistakes when using xcom_pull include:

  • Not specifying the correct task_ids, resulting in None values.
  • Forgetting to push data with xcom_push before pulling.
  • Using xcom_pull outside task context where task_instance is not available.
  • Pulling large data via XCom, which is not recommended as XComs are meant for small messages.

Example of a wrong and right way:

# Wrong: Missing task_ids
value = task_instance.xcom_pull()

# Right: Specify task_ids
value = task_instance.xcom_pull(task_ids='push_task')
python
value_wrong = task_instance.xcom_pull()
value_right = task_instance.xcom_pull(task_ids='push_task')
📊

Quick Reference

ParameterDescriptionDefault
task_idsID of the task to pull XCom fromNone (required)
keyKey of the XCom value to pull'return_value'
dag_idDAG ID if pulling from another DAGCurrent DAG
include_prior_datesInclude XComs from previous DAG runsFalse

Key Takeaways

Use task_instance.xcom_pull(task_ids='task_id') inside a task to get data from another task.
Always push data first with xcom_push before pulling it.
Specify the correct task_id and key to avoid getting None.
XComs are for small data; avoid pushing large objects.
xcom_pull must be called within a task context where task_instance is available.