0
0
Apache Airflowdevops~5 mins

Azure operators in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Azure operators in Airflow help you connect and interact with Microsoft Azure services easily. They let you automate tasks like creating virtual machines, uploading files to storage, or running Azure functions without writing complex code.
When you want to automate uploading data files to Azure Blob Storage as part of a workflow.
When you need to start or stop Azure virtual machines on a schedule.
When you want to trigger an Azure Function from an Airflow task.
When you need to run a data pipeline that reads from or writes to Azure Data Lake.
When you want to manage Azure resources like databases or containers automatically.
Config File - azure_dag.py
azure_dag.py
from airflow import DAG
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.operators.wasb_upload import WasbUploadOperator
from airflow.utils.dates import days_ago

with DAG(dag_id='azure_blob_example', start_date=days_ago(1), schedule_interval='@daily', catchup=False) as dag:
    upload_blob = WasbUploadOperator(
        task_id='upload_blob',
        container_name='example-container',
        blob_name='data/sample.txt',
        file_path='/tmp/sample.txt',
        azure_blob_conn_id='azure_blob_default'
    )

    delete_blob = WasbDeleteBlobOperator(
        task_id='delete_blob',
        container_name='example-container',
        blob_name='data/old_sample.txt',
        azure_blob_conn_id='azure_blob_default'
    )

    upload_blob >> delete_blob

This Airflow DAG file defines a simple workflow using Azure Blob Storage operators.

WasbUploadOperator uploads a file to a specified Azure Blob container.

WasbDeleteBlobOperator deletes a specified blob from the container.

The DAG runs daily starting from yesterday and connects to Azure using the connection ID azure_blob_default.

Commands
List all available DAGs to verify that the Azure DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
azure_blob_example example_dag
Trigger the Azure Blob Storage DAG manually to start the upload and delete tasks.
Terminal
airflow dags trigger azure_blob_example
Expected OutputExpected
Created <DagRun azure_blob_example @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the Azure Blob Storage DAG to see the upload and delete tasks.
Terminal
airflow tasks list azure_blob_example
Expected OutputExpected
upload_blob delete_blob
Run the upload_blob task for the given date to test uploading a file to Azure Blob Storage.
Terminal
airflow tasks test azure_blob_example upload_blob 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:876} INFO - Executing <Task(WasbUploadOperator): upload_blob> on 2024-06-01 [2024-06-01 12:00:01,000] {wasb.py:123} INFO - Uploading file /tmp/sample.txt to container example-container as data/sample.txt [2024-06-01 12:00:02,000] {taskinstance.py:1050} INFO - Task upload_blob succeeded
Key Concept

If you remember nothing else from Azure operators in Airflow, remember: they let you automate Azure service tasks easily within your workflows using simple, ready-made operators.

Common Mistakes
Not setting up the Azure connection in Airflow before running the operators.
The operators need credentials to access Azure services; without the connection, tasks will fail authentication.
Create an Azure connection in Airflow UI or CLI with the correct credentials and use its connection ID in the operator.
Using incorrect container or blob names in the operator parameters.
Azure Blob Storage will reject invalid names or non-existing containers, causing task failures.
Double-check container and blob names exist and follow Azure naming rules before running the DAG.
Summary
Define Azure operators in an Airflow DAG to automate tasks like uploading and deleting blobs.
Use airflow CLI commands to list, trigger, and test DAGs and tasks.
Ensure Azure connections are configured properly for authentication.