0
0
Apache Airflowdevops~5 mins

AWS operators (S3, Redshift, EMR) in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
AWS operators in Airflow help automate tasks like moving files to S3, running queries on Redshift, and managing EMR clusters. They solve the problem of manually handling cloud data workflows by letting you schedule and control these tasks easily.
When you want to upload or download files automatically to Amazon S3 as part of your data pipeline.
When you need to run SQL queries or load data into Amazon Redshift without manual intervention.
When you want to start, monitor, and terminate Amazon EMR clusters for big data processing jobs.
When you want to chain cloud data tasks in a workflow that runs on a schedule.
When you want to track success or failure of AWS tasks inside Airflow for better monitoring.
Config File - aws_operators_dag.py
aws_operators_dag.py
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.utils.dates import days_ago

JOB_FLOW_OVERRIDES = {
    'Name': 'example-emr-cluster',
    'ReleaseLabel': 'emr-6.9.0',
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            },
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'Applications': [{'Name': 'Hadoop'}, {'Name': 'Spark'}],
}

with DAG(
    dag_id='aws_operators_example',
    start_date=days_ago(1),
    schedule_interval='@once',
    catchup=False,
) as dag:

    create_s3_object = S3CreateObjectOperator(
        task_id='create_s3_object',
        s3_bucket='my-example-bucket',
        s3_key='example_folder/example_file.txt',
        data='Hello from Airflow AWS S3 Operator!',
    )

    run_redshift_query = RedshiftSQLOperator(
        task_id='run_redshift_query',
        sql='SELECT current_date;',
        redshift_conn_id='redshift_default',
    )

    create_emr_cluster = EmrCreateJobFlowOperator(
        task_id='create_emr_cluster',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
    )

    terminate_emr_cluster = EmrTerminateJobFlowOperator(
        task_id='terminate_emr_cluster',
        job_flow_id='{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
    )

    create_s3_object >> run_redshift_query >> create_emr_cluster >> terminate_emr_cluster

This Airflow DAG file defines a workflow using AWS operators:

  • S3CreateObjectOperator: uploads a text file to an S3 bucket.
  • RedshiftSQLOperator: runs a simple SQL query on Redshift.
  • EmrCreateJobFlowOperator: creates an EMR cluster with specified settings.
  • EmrTerminateJobFlowOperator: terminates the EMR cluster after use.

The DAG runs these tasks in order, automating a common cloud data pipeline.

Commands
List all available DAGs to confirm the new AWS operators DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
aws_operators_example example_dag
Trigger the AWS operators DAG manually to start the workflow immediately.
Terminal
airflow dags trigger aws_operators_example
Expected OutputExpected
Created <DagRun aws_operators_example @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the aws_operators_example DAG to see the steps involved.
Terminal
airflow tasks list aws_operators_example
Expected OutputExpected
create_s3_object run_redshift_query create_emr_cluster terminate_emr_cluster
Test the create_s3_object task for the given date to verify it uploads the file to S3 correctly.
Terminal
airflow tasks test aws_operators_example create_s3_object 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {s3.py:123} INFO - Uploading data to s3://my-example-bucket/example_folder/example_file.txt [2024-06-01 12:00:01,000] {s3.py:130} INFO - Upload successful
Key Concept

If you remember nothing else from this pattern, remember: AWS operators in Airflow let you automate cloud tasks like file uploads, queries, and cluster management as part of scheduled workflows.

Common Mistakes
Not setting up AWS connection IDs in Airflow before using AWS operators.
The operators need valid AWS credentials configured in Airflow to connect and perform actions; without them, tasks fail with authentication errors.
Configure AWS connections in Airflow UI or CLI with proper access keys and region before running AWS operator tasks.
Using incorrect or missing S3 bucket names or Redshift connection IDs in operator parameters.
The operators cannot find the target resources and will error out or silently fail.
Double-check bucket names, Redshift connection IDs, and other parameters for accuracy before running the DAG.
Not handling EMR cluster job flow IDs properly when terminating clusters.
If the terminate task does not get the correct cluster ID from the create task, the cluster will not be terminated, causing extra costs.
Use XComs to pass the cluster ID from the create task to the terminate task as shown in the example.
Summary
Define AWS operators in an Airflow DAG to automate S3 uploads, Redshift queries, and EMR cluster management.
Use airflow CLI commands to list, trigger, and test the DAG and its tasks.
Ensure AWS connections and parameters are correctly set to avoid authentication and resource errors.