Data Engineering/airflow

airflow DAG 결과를 Slack API로 메세지 받아보기

히또아빠 2023. 7. 13. 17:23

기존에 Jenkins를 이용해 코드 배치관리를 진행했는데 코드 관리나 배치 관리 복잡도가 증가하면서 모델별로 하나의 DAG로 관리하고 체계적인 system화를 위해 airflow로 배치를 옮기고 있다. 거기다 기존의 hive sql을 pyspark로 구현해서 정리까지 하고있는데 병아리 수준에서 시스템, 코드를 뜯어보고 새롭게 구성하려고 하니 에러 투성이다 ㅠ

 

Slack에서 제공되는 API를 활용하여 구성해둔 DAG가 정상 작동 했는지 메세지를 전달 받아 모니터링을 효율적으로 하는 목적으로 해당 페이지를 작성했다. 참고로 Slack과 airflow는 설치 및 세팅이 돼 있다는 가정하에 작성했고 DAG 구성에 대한 디테일한 설명은 없다. 추후에 공부하고 정리해서 따로 올리려고 한다.

[목차]


1.airflow DAG 의 Callbacks

Callbacks — Airflow Documentation

 

Callbacks — Airflow Documentation

 

airflow.apache.org

airflow는 DAG의 Task 결과에 따라 메세지를 호출할 수 있는 콜백 기능이 있다. 이러한 콜백 메세지는 작업 로그가 아닌 스케줄러 로그에 표시되며 $AIRflow_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log 에서 확인 가능하다.

1-1.Callback Types

콜백은 다음과 같이 5개의 타입으로 구성되어 있다. 아래의 예시는 DAG내에서 성공 및 실패 알람 옵션을 지정해준 예이다. Slack API 와 연동을 통해 슬랙 채널에서 DAG 결과를 전달 받는 방식에 대해서 정리.

on_success_callback
task 성공시 호출
on_failure_callback
task 실패시 호출
sla_miss_callback
task가 정해진 SLA(service level agreement) 충족을 못할 때 호출
on_retry_callback
task 재실행시 호출
on_execute_callback
task 실행전에 호출
  • 예시 code
def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")

def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")

with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=dag_success_alert,
    on_failure_callback=task_failure_alert,
    tags=["example"],
)

2.Slack setting

DAG의 task 결과를 슬랙에서 받기위해서 필요한것은 1) 어떤 채널에 어떻게 들어갈 것인지 2) 어떤 메세지를 보낼 것인지.

2-1.Slack app 생성 및 설정

해당 사이트에서 오른쪽 상단에 'Your apps > Create New App' 앱을 생성한다. 기존의 app이 없는 경우에는 'Create an App' 이라는 문구가 뜬다.

create app을 클릭하고 나면 아래와 같은 페이지가 나온다. 해당 페이지에서 'From Scratch'를 클릭한다. 그러면 app의 이름과 해당 app을 이용할 slack workspace를 선택한다.
나는 회사에서 운영하는 workspace를 선택했고, 슬랙이 처음이라면 해당 페이지에서 workspace를 생성한 후 진행하면 된다. 결론적으로 해당 workspace에 내가 원하는 채널에다가 코드 결과를 보낼 수 있다.

 

2-2.Slack app 권한 설정 및 token 발행

앱 생성후 다음과 같은 화면에서 이후에 메세지를 받기 위한 일종의 key인 token발행을 위해서 아래와 같은 권한 페이지에서 우리가 만든 앱에 대한 권한을 지정해 준다.

슬랙으로 메세지를 받기 위해서는 OAuth Scope 에서 'chat:write' 를 반드시 추가하고 그 외 설정들은 https://api.slack.com/scopes

 

Permission scopes

The capabilities and permissions of Slack apps are governed by named scopes.

api.slack.com

해당 페이지에서 확인 할 수 있다. 채널을 private하게 생성한 경우라면 Scope에 'groups:read' 를 추가하면 된다.

해당 scope 설정을 하고 'install to Workspace' 를 클릭하면 OAuth Tokens을 발급 받을 수 있다. 이 Token은 일종의 인증 key로 우리가 만든 채널에 app이 접근할 수 있도록 한다.

slack 봇의 token은 'xoxb-' 로 시작하는데 해당 key는 집열쇠처럼 잘 보관해두자.

 

2-3.Slack 채널에 앱 등록 및 채널 ID 확인

우리가 만든 앱을 채널에 등록하는 단계이다. 우선, 기존에 만들어둔 채널이 있다는 가정하에 우클릭후 '채널의 세부정보 보기' 로 들어간다. 그리고 '통합'  탭에서 '앱 추가'를 진행한다.  검색 기능을 통해 앞서 2-1 단계에서 만든 'test'라고 만든 앱을 채널에 추가한다.

 

 
 

지금까지 메세지를 보내고자하는 채널에 앱을 등록했고 token을 발급 받았다.

https://api.slack.com/methods/conversations.list/test 해당 사이트에서 key를 주고 채널의 ID를 받아온다.
xoxb로 시작하던 token값을 입력하고 'test method' 클릭하면 오른쪽에 API response를 반환해 준다. 나중에 airflow 메세지를 보낼때 채널명이나 채널 id가 필요한 경우가 있어 해당 내용을 저장해둔다.
참고로 공유 채널이 아니라 pivate 채널인 경우에는 앞서 scope 설정시 group:read를 설정해주고 type에 private_channel을 적어준다.

 

conversations.list API method

Lists all channels in a Slack team.

api.slack.com

슬랙에서의 모든 준비는 끝났다.

3.Airflow에서 Slack Bot 활용

위에서 발급받은 token 정보와 메세지를 받고자하는 채널 정보만 있으면 DAG의 Callback 기능을 활용하여 task 결과를 슬랙 채널에서 받아볼 수 있다. airflow 웹을 접속하지 않고도 간편하게 결과를 받을 수 있다는 점에서 장점이 있고 메세지 custom 하여 DAG에서 정하는 원하는 정보를 받아 볼 수 있다. (여기서는 DAG의 task의 성공, 실패여부만 다뤘다.) 

  • airflow가 자체적으로 제공하는 slack operator가 존재하지만 airflow의 버전 의존성이 존재하기 때문에 python slack_sdk를 활용했다.

3-1.Slack 알람 Class 파일 생성

slack_sdk 패키지가 설치 가정. → pip install slack_sdk

추가적으로 log_uri, exception 정보, execution_time 등을 custom을 통해 전달 받을 수 있다.

from slack_sdk import WebClient
from datetime import datetime

# slack 알람 class
class SlackAlert:
     def __init__(self, channel, token):
         self.channel = channel
         self.client = WebClient(token = token)

     def success_msg(self, msg): # dag 성공시 message
         text = f"""
             date : {datetime.today().strftime('%Y-%m-%d')}
             alert:
                DAG success!
             task id : {msg.get('task_instance').task_id},
             dag id : {msg.get('task_instance').dag_id}
             """
         self.client.chat_postMessage(channel = self.channel, text = text)

     def fail_msg(self, msg): # dag 실패시 message
         text = f"""
             date : {datetime.today().strftime('%Y-%m-%d')}
             alert:
                 DAG Fail!
             task id : {msg.get('task_instance').task_id},
             dag id : {msg.get('task_instance').dag_id}
             """
         self.client.chat_postMessage(channel = self.channel, text = text)

3-2.DAG 구성

앞서 구성한 알람.py 파일의 객체를 import하고 우리가 앞서 받은 token 정보와 token 정보를 주고 받은 채널 정보를 입력하면 슬랙으로 DAG 결과를 메세지로 받을 수 있다. 아래는 DAG 구성이며 callback 옵션에 DAG 성공 및 실패시 보낼 메세지 함수를 인자로 넣어 주었다.

from datetime import timedelta, datetime
from airflow import DAG
import pendulum
import os
from pyspark.sql import SparkSession
import requests
from airflow.models import Variable
from utils.slack_alert import SlackAlert

slack = SlackAlert("#hs_monitoring", "xoxb-2000356498307-5565498700550-HjJ2nPw0lWSQD7O8SbZDlDTT")

# timezone
kst = pendulum.timezone('Asia/Seoul')

dag_args = {
    'owner': 'user',
    'start_date': datetime(2023, 7, 12, tzinfo=kst)}

# dag
dag = DAG(
    dag_id = 'test',
    default_args = dag_args,
    on_success_callback = slack.success_msg,
    on_failure_callback = slack.fail_msg,
    schedule_interval ='40 10 * * *',
	)

PYSPARK_APP_HOME = Variable.get('PYSPARK_APP_HOME')

# tasks
t1 = SparkSubmitOperator(task_id = 'test',
                         conn_id = 'test_id',
                         application = f'{PYSPARK_APP_HOME}/spark/test',
                         num_executors = ,
                         executor_cores = ,
                         executor_memory = '',
                         driver_memory = '',
                         name = 'test',
                         env_vars = {'PATH': '/usr/local/spark/bin:/usr/bin:/usr/local/bin:$PATH'},
                         dag = dag,
                         application_args=["/home/mahdyne/wd/data/spark_streaming/click_stream","searched_at","12000"]
                         )

t1
300x250
반응형