Airflow 튜토리얼 (1)

성연찬's avatar
May 12, 2024
Airflow 튜토리얼 (1)
데이터 추출, 가공, 저장, 분석 파이프라인을 자동화하기 위해 위해 Airflow를 빠르게 배워보고자 강의를 시청했다. 강의는 Windows OS 기준으로 진행되었지만, 본 글은 Mac OS를 사용해 진행했다. 필요한 개념과 몇몇 이슈들을 해결한 과정을 함께 정리한다.

1. 환경설정

  • Prerequisite:
    • docker (airflow를 docker로 설치하는 방법으로 진행)
    • docker compose
    • vscode
  • 작업 폴더 내에 Dockerfile을 아래와 같이 작성한 후 이미지 빌드
    • 커맨드: docker build -t airflow-tutorial .
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
  • docker-compose.yaml 을 아래와 같이 작성하고 compose up
    • 커맨드: docker compose -f "docker-compose.yaml" up -d --build
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone
  • http://localhost:8080/ 로 접속해 로그인
    • ID: admin
    • PW: standalone_admin_password.txt 파일에 적혀있음
  • 종료: docker compose down

2. DAG 기초

  • alrflow/dags/welcome_dag.py 파일 생성 후 아래와 같이 작성
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import requests def print_welcome(): print('Welcome to Airflow!') def print_date(): print('Today is {}'.format(datetime.today().date())) def print_random_quote(): response = requests.get('<https://api.quotable.io/random>') quote = response.json()['content'] print('Quote of the day: "{}"'.format(quote)) dag = DAG( 'welcome_dag', default_args={'start_date': days_ago(1)}, schedule_interval='0 23 * * *', # 매일 오후 11시에 DAG 실행 catchup=False ) print_welcome_task = PythonOperator( task_id='print_welcome', python_callable=print_welcome, dag=dag ) print_date_task = PythonOperator( task_id='print_date', python_callable=print_date, dag=dag ) print_random_quote = PythonOperator( task_id='print_random_quote', python_callable=print_random_quote, dag=dag ) # Set the dependencies between the tasks print_welcome_task >> print_date_task >> print_random_quote
  • Airflow가 실행되어있는 상태애서 dag를 추가하고 조금 시간이 지나면 UI에서 DAG 확인 가능
    • UI 상에서 직접 트리거 가능
    • UI의 Logs 에서 결과 확인

3. Provider VS Operator VS Hooks

  • Coverage: Provider ⊃ Operator, Hooks
  • Operator: DAGs 내에서 개별 작업을 정의하고, 재사용 가능한 작업 단위를 제공
    • Action: 특정 작업을 실행
      • ex) PythonOperator, BashOperator 등
    • Transfer: 데이터를 옮김
      • ex) S3ToRedshiftOperator, LocalFilesystemToGCSOperator, S3ToGCSOperator 등
    • Sensor: 특정 조건이 충족될 때까지 대기
      • ex) S3KeySensor, RedshiftClusterSensor 등
    • 참고: Operator는 Blueprint (Python의 Class), Task는 Implementation (Python의 Object)에 해당
    • Operator 사용 예시
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG(dag_id='bash_operator_dag', start_date=datetime (2023, 8, 1), schedule_interval="@daily") as dag: task1 = BashOperator ( task_id='command_example', bash_command='echo "Airflow is running!"* ) task2 = BashOperator( task_id='execute_script', bash_command='/path/to/your/script.sh', env={'ENV_VAR': 'value'} ) first_task >> second_task
  • Hook: 외부 시스템과의 인터페이스를 추상화하는 클래스
    • Hook 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook pg_hook = PostgresHook(conn_id='my_postgres_connection') result = pg_hook.get_records 'SELECT * FROM table')

4. Task, DAGs, airflow.cfg

  • airflow.cfg 파일에서 airflow 운용을 위한 전반적인 설정을 커스텀할 수 있음
    • 예를 들어 smtp 서버나 dags folder, plugins folder 같은 것들
  • Tasks 간 dependency를 만드는 방법 4 가지
# 1. Bitshift operations (>>, <<) task1 >> task2 >> task3 # 2. set-upstream and set-downstream function task1.set_upstream(task2) task3.set_downstream(task2) # 3. Chain funetion: chain(task1, task2, task3) # 4. TaskFlow API: dependencies are automatically inferred based on the sequence of task function calls) task1() task2() task3()
  • dags/exchange_rate_pipeline.py 파일 작성 (여러 Operator 활용해보는 목적)
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from clean_data import clean_data # Define or Instantiate DAG dag = DAG( 'exchange_rate_etl', start_date=datetime(2023, 10, 1), end_date=datetime(2023, 12, 31), schedule_interval='0 22 * * *', default_args={"retries": 2, "retry_delay": timedelta(minutes=5)}, catchup=False ) # Define or Instantiate Tasks download_task = BashOperator( task_id='download_file', bash_command='curl -o xrate.csv <https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata>', cwd='/tmp', dag=dag, ) clean_data_task = PythonOperator( task_id='clean_data', python_callable=clean_data, dag=dag, ) send_email_task = EmailOperator( task_id='send_email', to='sleekdatasolutions@gmail.com', subject='Exchange Rate Download - Successful', html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.', dag=dag, ) # Define Task Dependencies download_task >> clean_data_task >> send_email_task
  • airflow/plugins/hooks/clean_data.py 파일 작성
import os import pandas as pd from airflow.hooks.base_hook import BaseHook class CleanDataHook(BaseHook): def __init__(self, path='/tmp/xrate.csv'): self.path = path def clean_data(self): data = pd.read_csv(self.path, header=None) default_values = { int: 0, float: 0.0, str: '', } cleaned_data = data.fillna(value=default_values) now = pd.Timestamp.now() year = now.year month = now.month day = now.day data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}' os.makedirs(data_dir, exist_ok=True) cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)
  • dockerfile 수정
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow COPY requirements.txt /tmp/requirements.txt RUN pip install -r /tmp/requirements.txt ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
  • docker-compose.yaml 수정
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone my_smtp: image: ixdotai/smtp restart: always
  • airflow.cfg 수정
... smtp_host = my_smtp ...
  • requirements.txt 작성
pandas
  • smtp가 포함된 DAG를 실행시킨 후 docker compose down 시 리소스를 계속 사용 중이라는 메시지가 나오므로, 테스트 시 리소스까지 리셋하고 싶다면 compose up 시 다음과 같이 옵션 주기
    • docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
 

(참고) 이슈 리스트

강의를 따라가다가 이슈가 있었던 부분을 아래와 같이 해결
  • bytemark/smtp 이미지가 Mac OS(ARM)에서 동작하지 않음
    • ixdotai/smtp 이미지로 대체

References

Share article
RSSPowered by inblog