Data Engineering/책정리

[빅데이터를 지탱하는 기술] Ch5.빅데이터 파이프관리

히또아빠 2023. 5. 7. 22:44

5-1.워크플로우 관리

<[기초지식] 워크플로 관리>

  • 정해진 업무를 원활하게 진행하기 위한 구조
  • 워크플로 관리도구
    • 정기적으로 태스크 실행
    • 비정상적인 상태 감지후 해결
    • ex) Airflow, azkaban, Digdag, Luigi, Oozie
  • 워크플로 관리도구와 태스크
    • 정해진 개별 반복 처리를 태스크(Task)라고 함
    • 도구를 사용하는 이유는 태스크 실행의 실패 때문
    • 태스크 수 증가로 재실행이 어려워 도구를 활용함
  • 워크플로 관리 도구의 기능
    • 정기적인 스케쥴로 태스크 실행후 통지
    • 태스크간 의존 관계를 정하구 정해진 순서대로 실행
    • 태스크 실행결과 보관, 오류 발생시 재실행
  • 워크플로 관리 도구의 종류
    • 선언 형과 스크립트 형
      • 선언형(declarative)
        • XML, YAML 등의 서식으로 워크플로 기술
        • 미리 제공된 기능만 이용가능하지만 최소한의 기술로 태스크 정의 가능
        • 누가 작성하더라도 동일한 워크플로가 돼 유지보수성 높아짐
        • 동일 쿼리를 파라미터를 수정해서 여러 번 실행하거나 단순 반복적으로 워크플로 자동생성하는 경우에 사용
      • 스크립트형(scripting)
        • 태스크의 정의를 프로그래밍할 수 있어 유연성
        • 데이터 처리를 태스크 안에서 실행하는것도 가능
# Airflow에 의한 워크플로 정의
# 셀 스크립트의 템플릿 정의
SCRIPT = '''
aws s3 cp --recursive s3://example/logs/{{ ds }}/ .

# 셀 스크립트를 실행하는 태스크 등록
task = BashOperator(task_id = 'data_transfer', bash_command = SCRIPT)

<오류로 부터 복구방법 먼저 생각하기>

  • 복구와 플로우 재실행
    • 통신오류와 같이 몇 차례 반복하면 성공하는 오류
      • 기다리면 대처 가능
    • 인증오류와 같이 몇 차례 반복해도 실패하는 오류
      • 수동으로 대처
    • 실패한 태스크는 모두 기록하여 나중에 재실행할 수 있도록 해야함
    • 일련의 태스크는 플로우(Flow)
    • 각 플로우에는 실행 시에 고정 파라미터 부여
      • 일별 배치라면 특정 날짜가 파라미터
      • 동일 플로우에 동일 파라미터를 부여하면 완전히 동일한 태스크가 실행 되도록 함
      • 나중에 실패하더라도 재실행 가능함
      • 워크플로 관리 도구에서는 실패한 플로우 선택해서 재실행만으로 복구 가능
    • 되도록 태스크를 작게 유지
  • 재시도
    • 여러번 발생하는 오류를 재시도(Retry) 자동화하여 수작업 없이 복구
    • 재시도 간격을 5분이나 10분 정도로 두기
    • 재시도 횟수 설정 주의
      • 적으면 장애가 복구 되기전에 재시도 종료
      • 많으면 실패하지 않은것처럼 되기 때문에 모르고 넘어갈 수 있음
    • 오류의 원인을 그때마다 조사해 일어나지 않도록 대책 마련하는게 가장 좋은 해결책
  • 백필(backfill)
    • 플로우 전체를 처음부터 다시 실행
    • 파라미터에 포함된 일시를 순서대로 바꿔가면서 일정 기간의 플로우를 연속해서 실행
    • 대규모 백필 실행시 자동적인 재시도는 무효로 하고 오류는 모두 통지하는 편이 좋음

<멱등한 조작으로 태스크 기술하기>

  • 각 태스크는 원칙적으로 마지막까지 성공하거나, 실패하면 존재하지 않음을 전제로 해야함
  • 원자성 조작(atomic operation)
    • INSERT 문 2회 호출하는 태스크, 첫 번째 INSERT 종료한 상황에 오류발생시 태스크 재실행하면 데이터 중복 발생
    • 각 태스크가 시스템에 변경을 가하는 것을 한 번만 하도록
      • 트랜잭션 처리에 대응한 DB라면 여러번의 쓰기를 한 번의 트랜잭션으로 실행
      • 아니라면 쓰기가 필요한 수만큼 태스크 나눔
    • 원자성 조작에서의 문제
      • 태스크 구현상의 버그로 인해 원자성 조작 직후에 문제 발생하면 원자성 조작 자체는 성공하고 있음에도 워크플로 관리 도구는 오류를 여기는 경우가 있음
  • 멱등한 조작(idempotent operation)
    • 동일한 태스크를 여러번 실행해도 동일한 결과
    • SQL, 테이블 삭제후 다시 만들기
    • 원칙적으로 항상 데이터를 덮어 써야함, 각 태스크는 추가(append) or 치환(replace)
# t1 테이블 있으면 삭제
DROP TABLE IF EXISTS "tl";
# t1 테이블 작성
CREATE TABLE "t1" (...);
# t1 테이블 쓴다
INSERT INTO "t1" ...;
  • 멱등한 추가
    • 테이블 파티셔닝
    • 파티션 단위로 치환하면 멱등하게 테이블 유지 가능함
  • 원자성을 지닌 추가
    • 복잡한 플로우에서 하나의 테이블에 몇 번이고 데이터 써 넣음
    • 추가 반복이 아니라 중간 테이블을 만들어 처리 후, 마지막에 목적 테이블에 한 번에 추가하는 것이 안전
    • 문제 발생시 중간테이블만 삭제하고 재실행 가능
  • 재실행의 안전성을 높이기 위해서는 각 플로우가 전체로서 멱등하게 되도록 구현
# 중간테이블 작성(치환)
DROP TABLE IF EXISTS "t1";
CREATE TABLE "t1" (...);
INSERT INTO "t1" ...;
INSERT

<태스크 큐 - 자원의 소비량 컨트롤>

  • 예시) 2메가바이트의 압축이 안 된 텍스트 파일 1만개 = 20기가바이트
    • 하나의 파일을 압축해서 전송하는데 5초 걸리면 단순히 1만번 반복시 약 14시간
    • 각 태스크는 파일 서버로부터 파일을 추출하여 압축하고 분산 스토리지로 전송, 태스크는 셸 스크립트화하여 워크플로 관리 도구 안에서 호출
    • 대량의 태스크를 동시 실행시 서버 과부하가 걸리므로 제한 필요, 잡 큐(job queue), 태스크 큐(task queue)
    • 모든 태스크는 큐에 저장되고 일정 수의 워커 프로세스가 순서대로 꺼내면서 병렬화
    • 8개가 병렬로 태스크 실행
  • 병목 현상의 해소
    • 실제로 8코어 서버에 8개의 워커는 너무 부족, 각 태스크는 CPU 뿐만 아니라 디스크 I/O, 네트워크 I/O 소비
    • 워커의 수를 늘리면 좀 더 실행 속도를 높일 수 있으나 너무 증가시키면 어디선가 병목 현상이 발생해 성능 향상이 한계점에 도달해 오류 발생
    • 서버의 내부적 요인(증상 - 대책)
      • CPU 사용률 100% - CPU 코어 수 늘림, 서버 증설
      • 메모리 부족 - 메모리 증설, 스왑 디스크 추가, 태스크 작게 분할
      • 디스크 넘침 - 각 태스크가 임시 파일을 삭제하고 있는지 확인, 디스크를 증설
      • 디스크 I/O 한계 - SSD 등의 고속 디스크 사용, 여러 디스크로 분산
      • 네트워크 대역의 한계 - 고속 네트워크 사용, 데이터 압축률을 높임
      • 통신 오류나 타임 아웃 - 시스템 상의 한계 가능성, 서버 분리
    • 서버의 외부적 요인
      • 파일 복사에서 오류 발생시 파일서버의 성능 한계, 워커를 늘리는건 역효과
      • 분산 스토리지로 쓰기 빈도 높아 오류발생시 쓰기 빈도를 줄여야함
  • 태스크 수의 적정화
    • 태스크에는 날짜와 시간이 파라미터로 건네짐
    • 작은 파일을 모아서 하나의 파일로 하거나, 여러 파일을 한 번에 업로드
    • 그러한 태스크를 좀 더 많은 워커로 동시에 실행해 처리 효율 최대화
    • 최적화 프로세스는 Hadoop과 같은 분산 시스템에서도 동일함

5-2.배치 형의 데이터 플로우

<MapReduce의 시대는 끝났다>

  • 데이터 플로우: 다단계의 데이터 퍼리를 그대로 분산 시스템의 내부에서 실행
  • MapReduce의 구조
    • MapReduce 대체
      • 구글 - MillWheel
      • Hadoop - Tez
      • Spark

  • 3: 분할된 데이터를 처리하는 첫 단계를 ‘Map’
  • 4: 그 결과를 모아서 집계하는 두 번째 단계를 ‘Reduce’
  • 3,4를 반복하면서 목적하는 결과를 얻을 때까지 계속해서 데이터를 변환해 나가는 구조
  • 복잡한 데이터 처리에서는 Map, Reduce 여러번 반복해야 하므로 대기시간 길어짐
  • 애드 혹 데이터 분석에서 요구되는 간단한 집계는 MapReduce로 실현하는 것이 어려움

 

<MapReduce를 대신할 새로운 프레임워크 - DAG>

  • DAG(directed actclic graph, 방향성 비순환 그래프)
    • 노드와 노드가 화살표로 연결(방향성)
    • 화살표를 따라가도 동일 노드로 되돌아 오지 않음(비순환)
  • 기존의 MapReduce도 Map과 Reduce의 두 종류의 노드로 이루어진 간단한 DAG, 그러나 하나의 노드 처리가 안끝나면 다음으로 진행 x
  • DAG를 구성하는 각 노드가 모두 동시 병행으로 실행, MapReduce에 존재했던 대기 시간을 없앰

# 1: 파일로부터 데이터를 읽어 들인다.
lines = sc.textFile("sample.txt")
# 2: 파일의 각 행을 단어로 분해
words = lines.flatMap(lambda line: line.split())
# 3,4,5: 단어마다의 카운터를 파일에 출력
words.map(lambda word: (word, 1))\
		.reduceByKey(lambda a, b: a + b)\
    .saveAsTextFile("word_counts") # 실행 개시
  • DAG에 의한 프로그래밍 특징은 ‘지연평가(lazy evaluation)’
  • 먼저 데이터 파이프라인 전체를 DAG로 조립하고 실행에 옮김, 내부 스케줄러가 분산 시스템에 효과적인 실행 계획을 세워주는게 데이터 플로우의 장점

<데이터 플로우와 워크플로를 조합하기>

  • 데이터를 읽어들이는 플로우
    • 데이터 성능적으로 안정된 분산 스토리지에 복사하여 사용, 외부 데이터 소스애 여러 번 접속하면 성능 문제 발생
    • 외부의 데이터 소스에서 데이터 읽어 들일 때는 벌크 형의 전송 도구로 태스크를 구현
    • 데이터 복사만 와료되면 데이터 플로우 전문 분야, 텍스트 데이터 가공, 열 지향 스토리지로 변환 등 부하가 큰 처리는 데이터 플로우로서 실행

  • 데이터를 써서 내보내는 플로우
    • 데이터 집계 결과를 외부 시스템에 써서 내보내는 경우 데이터 플로우 안에서 대량의 데이터를 외부로 전송하는 것은 피하는 편이 무난
    • 벌크 형의 전송 도구를 사용하여 태스크 구현 하거나, 외부 시스템 쪽에서 파일을 읽어 들이조록 지시

<데이터 플로우와 SQL을 나누어 사용>

  • SQL을 MPP 데이터 베이스에서 실행
    • 데이터 웨어하우스의 파이프라인
  • 분산 시스템상의 쿼리 엔진에서 SQL 실행
    • 데이터마트의 파이프라인
  • 대화식 플로우
    • 애드 록 분석에서는 많은 데이터 처리를 수작업으로 시행하므로 워크플로 필요 x
    • 구조화 되지 않은 데이터를 애드 혹 분석시 데이터 플로우가 유용
  • 대화식 플로우
    • 애드 록 분석에서는 많은 데이터 처리를 수작업으로 시행하므로 워크플로 필요 x
    • 구조화 되지 않은 데이터를 애드 혹 분석시 데이터 플로우가 유용
300x250
반응형