Create Airflow dag script for Batch Data Pipeline Using Airflow MWAA, glue and AWS Redshift

Muhammad Saipul Rohman
6 min readJan 13, 2023

--

Di artikel ketiga ini akan membahas pembuatan airflow dag script melanjutkan dari artikel bagian kedua di https://msaipulr.medium.com/setup-data-infrastructure-for-batch-data-pipeline-using-airflow-mwaa-glue-and-aws-redshift-6332fbff2f4c.

Artikel ini adalah artikel bagian ketiga dan terakhir. Jika ingin mengetahui dari dari artikel pertama bisa di lihat di https://msaipulr.medium.com/build-batch-data-pipeline-using-airflow-mwaa-glue-and-aws-redshift-9bdd87d2af6e.

Sebelum setup DAG, saya akan setup Cloud9 terlebih dahulu. Apa itu Cloud9? Cloud9 adalah services IDE programming seperti VSCode, EClipse. Bedanya adalah Cloud9 berjalan di AWS EC2 sehingga bisa di gunakan dimanapun yang penting ada internet dan pastinya tidak perlu install ulang library yang di butuhkan berbeda dengan IDE yang di lokal :)

Setup Cloud9

Langkah-langkah membuat Workspace baru di Cloud9 yaitu :

  • Pergi ke Cloud9
  • Klik Create Environment
  • Pilih region terdekat dengan lokasi kalian saat ini agar tidak low latency. Untuk memilih region ada di bagian atas kana sebelah tanda tanya seperti screenshot di bawah ini
  • Input nama environment nya. Isian yang lain di biarkan default kemudian klik Create
  • Tunggu sampai selesai. Jika sudah selesai buka tab baru dan close tab welcome
  • Workspace kalian akan seperti di bawah ini
  • Jika kalian suka dengan thema ini bisa kalian pilih sendiri dengan cara klik View → Themes → Solarized → Solarized Dark di menu cloud9 workspace

Setup DAG

Tahap pertama setup DAG yaitu import library python yang dibutuhkan untuk membuat airfow DAG beserta custom airflow operator yang sudah di sediakan di airflow plugin.

from datetime import timedelta  
import airflow
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import AwsGlueCrawlerOperator

# Custom Operators deployed as Airflow plugins
from awsairflowlib.operators.aws_copy_s3_to_redshift import CopyS3ToRedshiftOperator

Berikutnya saya akan mendefinisikan default set paramater di Airflow DAG. Saya juga akan mendefinisikan nama bucket S3 dimana data dan script akan disimpan

Notes : Ganti value di variabel S3_BUCKET_NAME dengan S3 bucket masing-masing

S3_BUCKET_NAME = "airflow-yourname-bucket"  

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=2),
'provide_context': True,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}

Berikutnya adalah membuat objek DAG menggunakan argument default yang sudah di buat sebelumnya dan menetapkan schedule yang akan di jalankan oleh DAG

dag = DAG(  
'data_pipeline',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)

S3 Sensor

Selanjutnya menggunakan s3_prefix_sensor yang tersedia di Apache Airflow untuk menambahkan sebuah task di DAG yang akan menunggu objek tersedia di S3 sebelum di eksekusi.

s3_sensor = S3PrefixSensor(  
task_id='s3_sensor',
bucket_name=S3_BUCKET_NAME,
prefix='data/raw/green',
dag=dag
)

Maka Jika ada objek yang masuk di folder data/raw/green yang ada di S3 bucket maka otomatis akan menjalankan sensor ini

Glue Crawler

Untuk menjalankan Glue Crawler menggunakan operator yang ada di airflow, kalian bisa menggunakan package yang sudah di sediakan oleh Apache Airflow.

Sebelum menambahkan code untuk menjalankan Glue Crawler, buat Crawler nya menggunakan AWS CLI di Cloud9 Workspace. Detail langkah-langkah nya ada di bagian Setup Cloud9.

Pertama dapatkan ARN Glue IAM Service Role. Pergi ke IAM Console — Roles, cari AWSGlueServiceRoleDefault, klik role tersebut dan salin Role ARN nya. Kemudian ganti nomor ARN nya dan nama bucket S3 pada script di bawah ini

aws glue create-crawler \
--name airflow-workshop-raw-green-crawler \
--role arn\:aws\:iam::1111111111111111\:role/AWSGlueServiceRoleDefault \
--database-name default \
--targets "{\"S3Targets\":[{\"Path\":\"s3://airflow-yourname-bucket/data/raw/green\"}]}"

Jalankan script di atas pada terminal Cloud9 Workspace.

Berikutnya tambahkan langkah untuk menjalankan crawler di DAG saat ada objek masuk di S3.

config = {"Name": "airflow-workshop-raw-green-crawler"}

glue_crawler = AwsGlueCrawlerOperator(
task_id="glue_crawler",
config=config,
dag=dag)

Glue ETL Job

Tahap berikutnya kita akan import script job ke Glue di Console AWS. Langkah — langkahnya yaitu :

  • Download job script package di sini dan extract file zip nya
  • Modifikasi script job Glue nyc_raw_to_transform.py(di dalam folder glue) untuk mengganti nama S3 bucket di bagian data sink.
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://airflow-yourname-bucket/data/transformed/green"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://mwaa-demo-buckets/data/transformed/green"}, format = "parquet", transformation_ctx = "datasink4")
  • Copy script job Glue yang sudah di modifikasi sebelum nya ke S3 bucket s3://airflow-yourname-bucket/scripts/glue/
  • Login ke Console AWS Glue kemudian klik Add Job
  • Input name, untuk IAM role pilih AWSGlueServiceRoleDefault
  • Di bagian This job runs, pilih An existing script that you provide
  • Pilih path S3 untuk penyimpanan script python untuk menjalankan job Glue dan folder temporary lalu klik Next
  • Klik Save job and edit script
  • Pastikan step datasink4 yang ada di script python sebelumnya sudah mengarah ke path S3 yang benar
  • Klik Save
  • Tutup editor dengan mengklik X mark

Di bawah ini adalah script untuk menjalankan job Glue yang sudah ada menggunakan airflow operator AwsGlueJobOperator

glue_task = AwsGlueJobOperator(  
task_id="glue_task",
job_name='nyc_raw_to_transform',
iam_role_name='AWSGlueServiceRoleDefault',
dag=dag)

Redshift

Ok kita masuk ke task terakhir yaitu mengcopy data hasil transformasi ke table yang ada di redshift cluster. Sebelum itu, lakukan setup koneksi redshift cluster di Apache Airflow agar Redshift bisa berkomunikasi dengan Airflow. Langkah — langkahnya yaitu :

  • Pergi ke Airflow Web UI(Link nya ada di MWAA console)
  • Klik menu Admin → Connections
  • Klik tanda + untuk membuat koneksi baru
  • Input conn id redshift_default
  • Pilih Postgres di dropdown Conn Type
  • Input Redshift endpoint di Host

Untuk mengetahui Redshift endpoint, pergi ke Redshift console, klik cluster yang sebelumnya di buat dan salin nama endpoint nya. Perhatikan bahwa di pengaturan koneksi Airflow kalian harus tidak mencatumkan port/database dan hanya input endpoint yang akhiran nya adalah .com seperti gambar di bawah ini

  • Input Schema : dev
  • Input Login : admin dan Password input admin password
  • Input Port : 5439 kemudian Klik Save

Berikutnya adalah tambahkan code di bawah ini untuk mencopy data hasil transformasi dari AWS Glue ke Redshift.

Pertama-tama dapatkan AWS Redshift IAM Service role ARN. Pergi ke IAM Console — Roles, cari role AmazonMWAA-workshop-redshift-role lalu klik role tersebut dan copy Role ARN nya.

Ganti iam_role_arn dengan Redshift IAM Role yang sudah kalian copy Role ARN sebelumnya

copy_s3_to_redshift = CopyS3ToRedshiftOperator(
task_id='copy_to_redshift',
schema='nyc',
table='green',
s3_bucket=S3_BUCKET_NAME,
s3_key='data/transformed',
iam_role_arn='arn\:aws\:iam::111111111111\:role/AmazonMWAA-workshop-redshift-role',
copy_options=["FORMAT AS PARQUET"],
dag=dag,
)

DAG Run

Di tahap terakhir ini tambahkan script di bawah ini untuk menjalankan task yang ada di DAG airflow ini.

s3_sensor >> glue_crawler >> glue_task >> copy_s3_to_redshift

Jika sudah selesai, copy python script tersebut ke folder s3://airflow-yourname-bucket/dags.

Setelah upload code dag ke S3, tunggu beberapa menit sebelum DAG tampil di UI Airflow. Kalian bisa akses UI Airflow dengan mengklik link di Console Airflow

Run Airflow DAG

Untuk menjalankan DAG kalian perlu meng-enable DAG nya dengan menggeser ke samping agar bisa enable. Jika sudah di enable DAG nya maka otomatis akan jalan schedulernya.

Jika scheduler nya tidak jalan setelah di enable, kalian bisa menjalankan manual dengan mengklik tombol play yang di tampilkan pada gambar di bawah.

Setelah DAG airflow jalan, DAG Airflow akan menunggu file masuk di path S3 s3://airflow-yourname-bucket/data/raw/green/. Agar DAG airflow berjalan, copy file ke spesifik path tersebut menggunakan command AWS S3 cli di console Cloud9.

Ganti nama bucket S3 di bawah ini sesuai dengan nama bucket S3 kalian

aws s3 cp s3://ee-assets-prod-us-east-1/modules/f8fe356a07604a12bec0b5582be38aed/v1/data/green_tripdata_2020-06.csv s3://airflow-yourname-bucket/data/raw/green/

Ketika file sudah masuk ke path S3 tersebut, task s3_sensor akan berganti status menjadi complete dan eksekusi nya akan lanjut ke task berikutnya.

Jika semua task DAG sudah selesai, maka akan di tandai warna hijau seperti gambar di bawah ini

Untuk data nya juga sudah masuk ke Redshift seperti pada gambar di bawah ini

Untuk full source code nya yaitu bisa di lihat di https://github.com/saipulrx/airflow_in_aws/blob/main/demo_simple_mwaa_redshift.py

--

--

Muhammad Saipul Rohman

Data Engineering, Data Science and Cloud Computing Enthusiast.