Data Engineering/프로그래머스 study 11기

[프로그래머스] 데이터 엔지니어 study - 3주차

히또아빠 2023. 2. 1. 16:01

0.커리어를 시작하는 관점에서

  • 커리어는 정글짐 - 방향성 다양
  • 40전에 다양한 경험
  • 전문성 - 학습할 자신감, 변화 두려워 하지 않는 마음
  • 1.결과를 내는데 집중하기
    • 성취하는 경험하기
    • 어떤 결과를 낼 것인지 생각
    • 일을 왜하고 어떻게 끝내야 성공으로 끝나는지 나에게 일 준 사람 입장에서 생각하고 문제 정의
    • 중요하지 않는 일은 대충하기
  • 2.성장을 저해하는 요소
    • 나이 혹은 남과의 비교
    • 나에 대한 고정관년
    • 과거의 상처
  • 3.인격적인 성숙
    • 긍정적인 태도
    • 불만 1개, 감사 1개
    • 주기적으로 회고
    • 실수를 인정하는 여유, 모를때 모른다
  • 4.시작과 꾸준함의 중요성 - 복리가 있는 일
    • 시작이 반
    • 꾸준히 매일
    • 운동, 학습(호기심), 네트워킹(사람 만나기), 책읽기(글쓰기)
  • 5.네트워크의 중요성
    • 사람들 만날때 호기심 갖기
    • 첫 인상에 사로잡히지 않기
    • 좋은 평판 유지
    • 좋은 사람들과 일하기
  • 6.자신의 강점과 약점 이해
    • 강점을 최대화 하는 일 찾기
    • 강점이 약점이 되는 순간을 잘 인지하기
    • comfort zone에서 나오기
    • 노력해서 바뀌든가, 다른 곳으로 가든가
  • 7.사기 증후군(imposter stndrome) 극복하기
    • 주변 사람이 나보다 잘나 보인다면
    • 자신감 쌓기, 성취하는 경험

숙제 review

  • row_number, WITH subquery, self join
  • first_value, last_value 옵션이 있어 직관적이지 않음(rows between)
  • join 관계 생각, count(1) 해서 보기

1.Spark/Athena 사용 시나리오

  • 비구조화된 데이터 S3에 저장 후 spark, athena로 데이터 처리
  • Spark ML모델의 feature를 배치로 미리 계산하는 경우 대용량 데이터 병렬처리
    • NoSQL key: value storage
    • ML 사용시 데이터 포멧은 libsvm(id key:value)
  • Spark: 다수의 서버에서 분산된 데이터 처리, SQL 사용, ML 모듈, spark streaming, 사용자 로그인시 이벤트로 다 처리 가능
  • Athena: 빅데이터용 SQL

2.데이터 파이프라인?

  • 용어설명
    • ETL vs ELT
      • ETL: 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
      • ELT: 데이터 웨어하우스 내부 데이터를 조작해 새로운 요약, 추상화 된 데이터를 만드는 프로세스, DBT(Analytics engineering)
    • Data Lake vs Data Warehouse
      • Data Lake: 정형 데이터 + 비정형 데이터, 과거 데이터 스토리지(보존 정책 없음),데이터 웨어하우스보다 큰 규모, S3(raw format data)
      • Data Warehouse: 일부 보존 정책을 통해 정교하고 구조화된 데이터에 더욱 집중, BI 툴(Looker, Tableau, Superset 등)은 데이터 웨어하우스와 연결
  • 데이터 파이프라인 정의
    • Data pipeline, Data workflow, DAG(directed acyclic graph), ETL(extract transform load)
    • 데이터 소스(production DB, API 등)으로부터 목적지(데이터 웨어하우스, 캐시 시스템 - Redis, Memchche, S3, NoSQL) 로 적재하는 작업
    • 보통 코딩: python, 스칼라 혹은 SQL로 작업
  • 세가지 형태의 데이터 파이프라인
    • 1.Raw Data ETL Jobs - 데이터 엔지니어
      • 외부와 내부 데이터 소스에서 데이터를 읽어다가(API)
      • 적당한 데이터 포맷 변환후(Spark)
      • 데이터 웨어하우스 로드
    • 2.Summary/Report Jobs(ELT)
      • DW(DL)로 부터 데이터를 읽어 DW에 쓰는 ETL
      • Raw Data를 읽어서 일종의 리포트, 써머리 형태의 테이블 다시 만듦
      • AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
      • CTAS는 SQL로 만들고 데이터 분석가가 함, DBT 사용
    • 3.Production Data Jobs
      • DW에서 데이터를 읽어 다른 Storage(프로덕션 환경)로 쓰는 ETL
        • 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
        • 머신러닝 모델에서 필요한 피쳐들을 미리 계산하는 경우
      • 타겟 스토리지
        • Cassandra/HBase/DynamoDB 와 같은 Nosql
        • MySQL과 같은 관계형 DB(OLTP)
        • Redis/Memcache와 같은 캐시
        • ElasticSearch와 같은 검색엔진

3. Airflow

  • 소개
    • 데이터 파이프라인 플랫폼
    • 데이터파이프라인 작성 라이브러리 제공
    • 데이터파이프라인 스케쥴링 + web UI 제공
    • 버전 선택은: GCP같은 큰회사가 쓰는게 뭔지 확인
    • DAG(Directed Acyclic Graph): 개발자가 python으로 작성한 workflow, Task dependency 정의
  • Airflow 구성
    • Web server(python flask로 구현됨): Airflow의 웹 UI 서버
    • scheduler: 모든 DAG와 Task에 대하여 모니터링 및 관리하고, 실행해야할 Task를 스케줄링
    • worker: 실제 Task를 실행하는 주체입니다. Executor 종류에 따라 동작 방식이 다양합니다.
    • database: DAG와 스케쥴링 info DB(기본 SQLite), : Airflow에 존재하는 DAG와 Task들의 메타데이터를 저장하는 데이터베이스
  • Queue(멀티 워커 노드 구성인 경우만 사용, 이 경우 Executor가 달라짐, KubernetesExcutor)
  •  
  • 스케일 업(더 좋은 사양 서버), 스케일 아웃(서버 추가, 관리 복잡도 늘어남, 최대한 뒤로 미뤄야 함)
    • 스케일 아웃 보다는 클라우드 버전으로 가는게 좋음(시간과 노력을 줄여줌)
  • 장, 단점
    • 장점
      • 데이터 파이프라인 세밀한 제어
      • 다양한 데이터 소스와 DW 지원
      • 백필(Backfill) 쉬움
    • 단점
      • 배우기 어려움
      • 상대적으로 개발 환경 구성 어려움
      • 직접 운영이 쉽지 않음(cluster → cloud), 클라우드 버전 선호됨
      • GCP - cloud composer, AWS - Mnaged workflows for Apache Airflow
  • DAG(Directed Acyclic Graph)
    • Airflow 에서는 ETL을 부르는 명칭
    • 테스크(Airflow의 오퍼레이터로 만들어짐)로 구성됨
    • 태스크는 Airflow의 오퍼레이터(Operator)로 만들어짐
      • operator 제공: Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job
  • 기본정보
    • Task에 필요한 기본정보
      • owner: 비지니스 오너
      • start_date
      • end_date
      • email: 실패시 메일
      • retries: 실패시 몇번 실행
      • retry_delay: 실패시 몇번 쉬고 retries 할지
    • DAG에 필요한 기본정보
      • “DAG name”
      • schedule_tinerval: 크론탭 문법을 따름
        • 0 * * * *: 매시 0 분에 실행되는 DAG, 1시간에 한번
        • 0 12 * * *: 매일 한번 12:00에 실행되는 DAG, 하루에 한번
      • tags

4.데이터 파이프라인 작성시 고려사항

  • 이상 및현실
    • 문제없다, 관리 어렵지 않다
    • 버그, 데이터 소스상 이슈, 데이터 파이프라인, DAG들 간의 의존도 이해 부족
    • 데이터 파이프라인 수가 늘어나면 유지보수 비용 증가
  • Best Practices
    • 1.Full refresh
      • Full refresh: 가능하다면 데이터 작은 경우 통으로 읽기
      • Incremental update: 데이터가 클때 대상 데이터소스가 갖추어야 할 조건, created, modified, deleted
    • 2.멱등성(Idempotency) 보장
      • 최종 테이블 내용 변경 x
      • 중복 데이터 생성 x
    • 3.실패시 재실행이 쉬어야 함
      • 과거 데이터 채우는 과정(Backfill)이 쉬어야함
      • inremental update 경우엔 어려움
    • 4.명확성
      • 데이터 파이프라인 입출력 명확히 하고 문서화
      • 안쓰는 데이터 삭제
    • 5.문서화
      • 데이터 파이프라인 사고시 마다 리포트 작성, 동일한 비슷한 사고 방지
      • 중요 데이터 파이프 라인 입출력 확인, 입출력 레코드 수부터 간단히 확인, 써머리 테이블 만들어 내고, 중복레코드 확인 등

5.Daily incremental update 구현

  • 2020년 11월 7일부터 하루치 데이터 읽으면 2020년 11월 8일부터 ETL 작동(DAG 실행)
  • Airflow start_date는 처음 읽어와야하는 데이터 날짜
  • execution_date: 2020.11.07 시스템 변수로 읽어와야하는 날짜 지정
  • incremental하게 1년치 데이터를 Backfill 시
    • 해결방법1: 기존 ETL을 수정해서 지난 1년치 데이터에 대해 돌림
      • 실수하기 쉽고 수정하는데 시간 걸림
      • yesterday에 해당하는 데이터를 소스에서 읽어옴
      from datetime import datetime, timedelta
      y = datetime.now() - timedelta(1)
      yesterday = datetime.strftime(y, '%Y-%m-%d')
      
      • 예를들어, production DB의 특정 테이블에서 읽어온다면
      sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
      
    • 해결방법2: 시스템적으로 해결, Airflow 접근 방식
      • 모든 DAG 실행에는 ‘execution_date 지정되어 있음
      • execution_date로 채워야하는 날짜와 시간이 넘어옴
      • 이를 바탕으로 데이터 갱신하도록 코드작성
      • Backfill 쉬워짐
  • start_date와 execution_date 이해하기
    • start_date: 실행 날짜가 아니라 스케줄 시작 날짜
    • 위의 dag는 1월 2일 8시에 execution_date가 1월 1일인 DAG 첫 실행
    • execution_date는 그대로 1월1일로 유지되는 DAG의 고유 실행 ID와 같음
import pendulum from airflow 
import DAG from datetime 
import datetime, timedelta 
# 한국 시간 timezone 설정 
kst = pendulum.timezone("Asia/Seoul") 

# 한국 시간 2021년 1월 1일 시작, 오전 8시마다 실행되는 DAG 설정 
dag = DAG( dag_id="test_dag", 
		default_args=default_args, 
        start_date=datetime(2021, 1, 1, tzinfo=kst), 
        schedule_interval="0 8 * * *", 
        ) 
        
YESTERDAY = '{{ execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}' 
TODAY = '{{ next_execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}'

 

300x250
반응형