0
0
Apache Airflowdevops~5 mins

TaskFlow API for cleaner XCom in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Airflow's TaskFlow API helps you write workflows where tasks can easily share data without messy code. It solves the problem of passing data between tasks in a clean and readable way.
When you want to pass results from one task to another without using manual XCom push and pull calls.
When you want your workflow code to look like regular Python functions with clear inputs and outputs.
When you want to avoid writing extra code to handle XCom serialization and deserialization.
When you want to improve readability and maintainability of your Airflow DAGs.
When you want to use decorators to define tasks simply and clearly.
Config File - taskflow_dag.py
taskflow_dag.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(start_date=days_ago(1), schedule_interval='@daily', catchup=False)
def example_taskflow_dag():

    @task
def extract():
        return {'name': 'Airflow', 'version': 2}

    @task
def transform(data):
        return f"{data['name']} version {data['version']}"

    @task
def load(message):
        print(f"Loading message: {message}")

    data = extract()
    message = transform(data)
    load(message)

example_taskflow_dag()

This Python file defines an Airflow DAG using the TaskFlow API.

@dag decorates the main function to define the workflow.

@task decorates Python functions to make them Airflow tasks that automatically handle XComs.

Data flows naturally as function return values and arguments, making the code clean and easy to follow.

Commands
List all available DAGs to verify that the new DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
example_taskflow_dag
Trigger the DAG run manually to start the workflow and see the tasks execute.
Terminal
airflow dags trigger example_taskflow_dag
Expected OutputExpected
Created <DagRun example_taskflow_dag @ 2024-06-01T00:00:00+00:00: manual__2024-06-01T00:00:00+00:00, externally triggered: True>
List all tasks in the DAG to confirm the tasks are defined correctly.
Terminal
airflow tasks list example_taskflow_dag
Expected OutputExpected
extract transform load
Run the 'extract' task locally for the given date to test its output and XCom push.
Terminal
airflow tasks test example_taskflow_dag extract 2024-06-01
Expected OutputExpected
[2024-06-01 00:00:00,000] {taskinstance.py:876} INFO - Executing <Task(TaskFlow) object: extract> on 2024-06-01 [2024-06-01 00:00:00,100] {taskinstance.py:1123} INFO - Marking task as SUCCESS. dag_id=example_taskflow_dag, task_id=extract, execution_date=2024-06-01
Key Concept

If you remember nothing else from this pattern, remember: TaskFlow API lets you pass data between tasks as simple Python function inputs and outputs without manual XCom handling.

Common Mistakes
Trying to manually push or pull XComs inside TaskFlow API tasks.
TaskFlow API automatically handles XComs for you, so manual calls cause confusion and errors.
Return values from @task functions and pass them as arguments to other @task functions.
Not decorating functions with @task inside the DAG function.
Without @task, Airflow does not treat the function as a task, so no XCom or scheduling happens.
Always use @task decorator on functions that represent tasks.
Defining tasks outside the DAG function decorated with @dag.
Tasks must be defined inside the DAG function to be part of that DAG's context.
Define all @task functions inside the @dag decorated function.
Summary
Use @dag to define your workflow and @task to define tasks as Python functions.
Return values from tasks automatically become XComs and can be passed as inputs to other tasks.
Trigger and test your DAG using airflow CLI commands to verify task execution and data passing.