0
0
Apache Airflowdevops~5 mins

Pushing and pulling XCom values in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes tasks in Airflow need to share small pieces of data. XComs let tasks send and receive these values easily during a workflow run.
When you want one task to send a result to another task later in the workflow.
When you need to pass a status or a small data value between tasks without using external storage.
When you want to coordinate tasks by sharing simple messages or flags.
When debugging and you want to check what data a task sent to another.
When you want to avoid writing temporary files or databases for small data exchange.
Config File - xcom_example_dag.py
xcom_example_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def push_function(ti):
    ti.xcom_push(key='sample_key', value='Hello from push task')

def pull_function(ti):
    pulled_value = ti.xcom_pull(key='sample_key', task_ids='push_task')
    print(f'Pulled XCom value: {pulled_value}')

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'xcom_push_pull_example',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
)

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    dag=dag
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    dag=dag
)

push_task >> pull_task

This DAG defines two Python tasks. The first task push_task uses ti.xcom_push to send a value with a key. The second task pull_task uses ti.xcom_pull to get that value by key and task ID, then prints it. The tasks run in order so the pull happens after the push.

Commands
This command starts the DAG run to execute the tasks that push and pull XCom values.
Terminal
airflow dags trigger xcom_push_pull_example
Expected OutputExpected
Created <DagRun xcom_push_pull_example @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
None - No flags needed for basic trigger
Check the logs of the push task to confirm it ran and pushed the XCom value.
Terminal
airflow tasks logs xcom_push_pull_example push_task
Expected OutputExpected
Running <TaskInstance: xcom_push_pull_example.push_task manual__2024-06-01T12:00:00+00:00 [running]> [2024-06-01 12:00:01,000] INFO - Pushed XCom key='sample_key' value='Hello from push task' [2024-06-01 12:00:02,000] INFO - Task succeeded
Check the logs of the pull task to see the pulled XCom value printed.
Terminal
airflow tasks logs xcom_push_pull_example pull_task
Expected OutputExpected
Running <TaskInstance: xcom_push_pull_example.pull_task manual__2024-06-01T12:00:00+00:00 [running]> Pulled XCom value: Hello from push task [2024-06-01 12:00:03,000] INFO - Task succeeded
Key Concept

If you remember nothing else from this pattern, remember: XComs let tasks share small data by pushing with a key and pulling by that key and task ID.

Common Mistakes
Trying to pull an XCom value before the push task has run.
The pull task will get None or fail because the value does not exist yet.
Set task dependencies so the push task runs before the pull task.
Using different keys or task IDs when pushing and pulling XCom values.
The pull will not find the value and return None.
Use the exact same key and correct task ID when pulling the XCom.
Trying to pass large data or complex objects via XCom.
XCom is designed for small data; large data can cause performance issues or errors.
Use external storage like databases or files for large data, and only use XCom for small messages.
Summary
Define a PythonOperator task that pushes a value using ti.xcom_push with a key.
Define another PythonOperator task that pulls the value using ti.xcom_pull with the same key and task ID.
Trigger the DAG and check task logs to verify the push and pull of XCom values.