from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.utils.dates import days_ago
JOB_FLOW_OVERRIDES = {
'Name': 'example-emr-cluster',
'ReleaseLabel': 'emr-6.9.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2,
},
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
},
'Applications': [{'Name': 'Hadoop'}, {'Name': 'Spark'}],
}
with DAG(
dag_id='aws_operators_example',
start_date=days_ago(1),
schedule_interval='@once',
catchup=False,
) as dag:
create_s3_object = S3CreateObjectOperator(
task_id='create_s3_object',
s3_bucket='my-example-bucket',
s3_key='example_folder/example_file.txt',
data='Hello from Airflow AWS S3 Operator!',
)
run_redshift_query = RedshiftSQLOperator(
task_id='run_redshift_query',
sql='SELECT current_date;',
redshift_conn_id='redshift_default',
)
create_emr_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id='{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
)
create_s3_object >> run_redshift_query >> create_emr_cluster >> terminate_emr_cluster
This Airflow DAG file defines a workflow using AWS operators:
- S3CreateObjectOperator: uploads a text file to an S3 bucket.
- RedshiftSQLOperator: runs a simple SQL query on Redshift.
- EmrCreateJobFlowOperator: creates an EMR cluster with specified settings.
- EmrTerminateJobFlowOperator: terminates the EMR cluster after use.
The DAG runs these tasks in order, automating a common cloud data pipeline.