기존에 다른 배치 프로그램을 사용하다가 airflow로 넘어오면서 뭐 같은 workflow 프로그램 이겠구나 싶었다. 근데 airflow만의 문법이나 기능? 개념? 등을 새롭게 접하면서 단순히 똑같은 배치 프로그램이라고 생각하고 넘어가기 보다는 찾아보고 정리하는 부분이 필요하다고 느꼈다.
그리고 airflow만의 시간에 대한 공부없이 냅다 dag를 만들어서 구현하려다 보니 너무 고생했어서 여기다 같이 정리 하겠다 ㅇㅅㅇ.
1.airflow란?
Apache Airflow™는 python 코드로 워크플로우(workflow)를 개발하고, 스케줄링 및 모니터링하기 위한 오픈 소스 플랫폼. DAG(Directed Acyclic Graph, 유향 비순환 그래프): python으로 작성한 workflow로 이를 통해 더 정교한 task dependency를 정의할 수 있다.
1-1.구성
- Web server: python flask(웹 서비스 제공하는 프레임 워크)로 구현, 웹 UI 서버
- Scheduler: 모든 DAG와 Task 모니터링 및 관리, 실행해야할 Task를 스케쥴링
- Worker: 실제 Task를 실행하는 주체, Executor 종류에 따라 동작 방식 다양
- Database: DAG와 스케쥴링 정보 DB(기본 SQLite), airflow에 존재하는 DAG와 Task의 메타데이터를 저장하는 데이터 베이스
1-2.작동방식
- Airflow 작동방식
- 1) python으로 작성된 DAG 읽고
- 2) 거기에 맞춰 Scheduler가 Task를 스케줄링
- 3) Worker가 Task를 가져가 실행
- 해당 실행 상태는 DB에 저장되고
- Web UI를 통해 Task의 성공여부, 실행상태등 확인 가능
2.DAG(Directed Acyclic Graph)
Airflow에서는 ETL(Etract Transform Load)를 부르는 명칭으로 실행하고 싶은 Task간의 관계와 dependency를 포함하고 있는 Task 모음. 어떤 순서, dependency, 스케줄에 따라 실행할지 정보 제공. airflow 공식 문서에서 dag를 선언하는 세가지 문법 제시.
- DAG 예시 코드
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# Set dependencies between tasks
hello >> airflow()
- ”demo"라는 이름의 DAG는 2022년 1월 1일에 시작하여 하루에 한 번 실행하는 워크플로우
- 두가지 task로 구성
- Bash 스크립트를 실행하는 Bash Operator와
- ariflow() Python 함수
- “>>” 등의 표현으로 task간 종속성을, 순서를 제어
2-1.Dag, Task 기본정보
- dag_id: DAG 이름
- start_date: 스케쥴 시작 시간
- schedule: DAG 스케쥴 시간, crontab 문법을 따름
- 0 * * * *: 매시 0 분에 실행되는 DAG, 1시간에 한번
- 0 12 * * * : 매일 한번 12:00에 실행되는 DAG, 하루에 한번
- task_id: DAG내 각 task에 대한 이름
- retries: 실패시 몇 번 실행
*참고: Airflow 시간은 UTC 기준으로 한국의 시간과 9시간 차이가 난다. 따라서, start_date에 맞게 DAG가 실행되기 위해선 추가 설정이 필요
- 환경변수 AIRFLOW__CORE__DEFAULT_TIMEZONE를 Asia/Seoul로 설정
- airflow.cfg 파일에서 default_timezone = utc 부분을 default_timezone = Asia/Seoul으로 수정
- DAG파일에서 pendulum 패키지를 이용해 시간을 변경.
import pendulum
from datetime import datetime
local_tz = pendulum.timezone("Asia/Seoul")
init_args = {
'start_date' : datetime(2020, 2, 28, 2, tzinfo=local_tz)
}
2-2.Operator
operator나 sensor가 각각 하나의 task로 구성되는데 airflow 자체에서 다양한 operator 제공
- SparkSubmitOperator: Spark를 실행하기 위한, 내부적으로 spark binary인 spark-submit 을 호출하는 방식으로 spark을 실행하고 있다. 그러므로 Apache spark 에서 각자의 환경에 알맞은 spark을 각 airflow worker에 다운로드 한후 다운로드 경로를 $SPARK_HOME을 환경변수로 등록하고 $SPARK_HOME/bin 을 path에 추가.
- yarn에 spark job을 동작시킬것임으로 HADOOP_HOME(Hadoop Binary), HADOOP_CONF_DIR(Hadoop 설정파일) 등을 설정
- BashOperator: bash command 실행
- PythonOperator: Python 함수 실행
- EmailOperator: Email 발송
- MySqlOperator: sql 쿼리 수행
- Sensor: 시간, 파일 등을 기다리는 센서
- reference
'Data Engineering > airflow' 카테고리의 다른 글
airflow에서 start_time, execution_time, backfill, catchup (0) | 2023.08.04 |
---|---|
airflow DAG 결과를 Slack API로 메세지 받아보기 (0) | 2023.07.13 |