데이터 웨어하우스 및 Airflow 자동화
2025. 10. 6. 23:48ㆍ카테고리 없음
📘 Airflow를 활용한 ETL 자동화: MySQL → S3 → Redshift
데이터 파이프라인 자동화를 위한 Airflow 기반 ETL 구축 튜토리얼
(UI 연동 직전까지 전체 구조 완성)
1️⃣ ETL 개념 정리
ETL은 Extract → Transform → Load 3단계를 거쳐 데이터를 수집하고 정제해 데이터 웨어하우스(DWH)로 옮기는 과정입니다.
2️⃣ 아키텍처 구조
┌───────────┐ ┌────────────┐ ┌────────────┐ ┌──────────────┐
│ MySQL DB │ → │ Extract │ → │ Transform │ → │ Load (S3) │
└───────────┘ └────────────┘ └────────────┘ └──────────────┘
│
↓
┌────────────────┐
│ Redshift (DWH) │
└────────────────┘
Airflow는 이 전체 파이프라인의 스케줄링 및 자동화 오케스트레이션을 담당합니다.
3️⃣ 사용 기술 스택
- Airflow 3.1.0 : ETL 스케줄링 및 자동화
- MySQL : 운영 DB (데이터 원본)
- AWS S3 : 중간 데이터 저장소
- Amazon Redshift : 최종 데이터 웨어하우스
- Python (pandas) : 데이터 전처리
4️⃣ Airflow 프로젝트 구조
~/airflow/
├── airflow.cfg # Airflow 설정 파일
├── dags/ # DAG 파일 저장 폴더
│ └── etl_redshift.py # ETL DAG 정의 파일
├── logs/ # Task 로그
└── airflow.db # Airflow 내부 DB
📌 Airflow는 airflow.cfg의 [core] dags_folder 설정을 지속적으로 감시하며,
해당 폴더의 .py 파일에서 DAG 정의를 자동으로 등록합니다.
5️⃣ .env 환경 설정
IAM=arn:aws:iam:::role/...
6️⃣ ETL DAG 코드 (etl_redshift.py)
📦 Import 및 초기 설정
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
import pandas as pd
from dotenv import load_dotenv
import os
file_path = 'redshift_users.csv'
load_dotenv()
aws_iam = os.getenv("IAM")
🔹 Step 1: Extract (MySQL → CSV)
def extract_mysql(**context):
mysql = MySqlHook(mysql_conn_id='mysql_default')
df = mysql.get_pandas_df('SELECT * FROM users;')
df.to_csv(file_path, index=False)
context['ti'].xcom_push(key='file_path', value=file_path)
🔹 Step 2: Transform (데이터 전처리)
def transform_data(**context):
file_path = context['ti'].xcom_pull(key='file_path', task_ids='extract')
df = pd.read_csv(file_path)
# 데이터 정제
df["name"] = df["name"].str.upper()
df = df.fillna({"country": "UNKNOWN"})
transformed_path = "users_clean.csv"
df.to_csv(transformed_path, index=False)
context['ti'].xcom_push(key="transformed_path", value=transformed_path)
🔹 Step 3: Load (S3 적재)
def load_to_s3(**context):
transformed_path = context['ti'].xcom_pull(key='transformed_path', task_ids='transform')
s3 = S3Hook(aws_conn_id="aws_default")
bucket_name = 'etl-tutorial-of-eddy'
key = 'users/users_clean.csv'
s3.load_file(filename=transformed_path, bucket_name=bucket_name, key=key, replace=True)
context['ti'].xcom_push(key='s3_path', value=f's3://{bucket_name}/{key}')
🔹 Step 4: Copy (Redshift 적재)
def copy_to_redshift(**context):
s3_path = context['ti'].xcom_pull(key='s3_path', task_ids='load')
redshift = PostgresHook(postgres_conn_id='redshift_default')
conn = redshift.get_conn()
cur = conn.cursor()
# 테이블 생성
cur.execute("""
CREATE TABLE IF NOT EXISTS users(
user_id INT,
name VARCHAR(50),
age INT,
country VARCHAR(50)
);
""")
# S3에서 Redshift로 COPY
copy_sql = f"""
COPY users
FROM '{s3_path}'
IAM_ROLE '{aws_iam}'
CSV IGNOREHEADER 1;
"""
cur.execute(copy_sql)
conn.commit()
cur.close()
🔹 DAG 정의 및 의존성 설정
with DAG(
dag_id="etl_redshift",
start_date=datetime(2025, 10, 6),
schedule="@daily",
catchup=False
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_mysql)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_to_s3)
copy = PythonOperator(task_id="copy", python_callable=copy_to_redshift)
# Task 의존성 정의
extract >> transform >> load >> copy
📚 다음 편 예고
👉 Airflow 내부 구조 완전 해부 — Scheduler, Triggerer, DagProcessor의 역할과 상호작용