python 18

airflow에서 start_time, execution_time, backfill, catchup

1.airflow의 시간 https://airflow.apache.org/docs/apache-airflow/stable/faq.html#what-s-the-deal-with-start-date: start_date, execution_date 공식문서 1-1.start_date 실행 날짜가 아니라 스케줄 시작 시간으로 DAG 첫 실행은 'start_date + 실행주기' 에 실행하게 된다. start_date: 2023-08-01 00:00:00 hourly job: schedule: 0 * * * * (매시 0분에 실행) → DAG의 첫 실행은 2023-08-01 01:00:00 daily job: schedule: 0 10 * * * (매일 10시 0분에 실행) → DAG의 첫 실행은 2023-08..

airflow DAG 결과를 Slack API로 메세지 받아보기

기존에 Jenkins를 이용해 코드 배치관리를 진행했는데 코드 관리나 배치 관리 복잡도가 증가하면서 모델별로 하나의 DAG로 관리하고 체계적인 system화를 위해 airflow로 배치를 옮기고 있다. 거기다 기존의 hive sql을 pyspark로 구현해서 정리까지 하고있는데 병아리 수준에서 시스템, 코드를 뜯어보고 새롭게 구성하려고 하니 에러 투성이다 ㅠ Slack에서 제공되는 API를 활용하여 구성해둔 DAG가 정상 작동 했는지 메세지를 전달 받아 모니터링을 효율적으로 하는 목적으로 해당 페이지를 작성했다. 참고로 Slack과 airflow는 설치 및 세팅이 돼 있다는 가정하에 작성했고 DAG 구성에 대한 디테일한 설명은 없다. 추후에 공부하고 정리해서 따로 올리려고 한다. [목차] 1.airflo..

[PySpark] explode, explode_outer 함수 차이

다음과 같은 스키마를 가진 테이블을 생성했다. an_array 라는 array 컬럼 하나와 a_map 으로 묶어진 key - value 컬럼이 존재한다 df = spark.createDataFrame( [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ("id", "an_array", "a_map") ) df.show() +---+----------+----------+ | id| an_array| a_map| +---+----------+----------+ | 1|[foo, bar]|{x -> 1.0}| | 2| []| {}| | 3| null| null| +---+----------+----------+ df.printSchema()..

[pandas] 날짜차이 컬럼 생성하기

판다스 데이터 프레임에서 특정 고정 날짜와 datetime64 열 사이의 기간을 나타내는 새 열을 만들려면 datetime64 열에서 고정 날짜를 뺄 수 있습니다. 그러면 고정 날짜와 각 날짜 시간 값 사이의 시간 차이를 나타내는 time delta64 데이터 유형이 생성된다. import pandas as pd # Example DataFrame df = pd.DataFrame({'Date': ['2023-05-10', '2023-05-11', '2023-05-12'], 'Time': ['09:00:00', '10:30:00', '14:15:00']}) # Convert 'Date' and 'Time' columns to datetime64 type df['Date'] = pd.to_datetime(d..

[프로그래머스] 데이터 엔지니어 study - 7주차

1.Spark 기초 hadoop이 1세대라면 스파크 2세대 빅데이터 처리 기술 YARN등을 분산환경으로 사용 Scala로 작성됨 등장 버클리 대학 AMPLab에서 아파치 오픈소스 프로젝트로 2013년 시작 Databricks 스타트업 창업 Spark 3.0 구성 Spark Core: pandas의 데이터 프레임과 같음 Spark SQL Spark ML: scikit learn의 분산 버전 Spark Streaming Spark GraphX 자체 서버 엔진을 들고있는게 아니라 하둡(YARN), Kubernetes 와 같은 resource 매니저 위에서 돌아감 Spark vs MapReduce Spark 기본적으로 메모리 기반 → 메모리 부족시 디스크 사용 하둡(YARN) 이외에도 다른 분산 컴퓨팅 환경 지..

[PySpark] 데이터프레임 값을 리스트로 반환하기

pyspark 데이터프레임의 특정 column의 value들을 리스트로 반환하는 방법이다. 여기서 x[2]는 cat2 column을 지칭하고 고유값이 아니라 전체 value를 list로 반환하려면 distinct() 없이 작업을 하면 된다. df2.show(3) #+---+----+----+--------+ #|ind| id|cat2|cat2_cnt| #+---+----+----+--------+ #| 1|2868| 167| 16| #| 2|1737| 157| 24| #| 3|1476| 189| 3| #+---+----+----+--------+ #only showing top 3 rows cat2_list = df2.rdd.map(lambda x: x[2]).distinct().collect() cat..

[PySpark] 빈 데이터 프레임 생성하고 데이터 집어넣기

pyspark에서 다음과 같이 테이블의 스키마를 지정하고 빈 데이터 테이블을 만들 수 있다. 스키마 지정시 뭐 int, float, double 등 데이터 유형을 지정할 수 있는데 임의로 string 데이터 형식을 지정해뒀다. spark 세션과 config는 본인의 환경에 맞게 설정하면 된다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() # conf.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0") conf.set("spark.driver.memory", "-g") conf.set("spark.exec..

[PySpark] regexp_replace 함수

PySpark 데이터 프레임에 있는 string value들을 다른값으로 바꾸거나 처리하는데 SQL string functions인 regexp_replace(), translate() 및 overlay()등을 사용할 수 있다. 그 중에서 PySpark SQL 함수인 regexp_replace() 사용하면 string column을 another string/substring column으로 생성할 수 있다. 예시를 보여주기 위해 우선, 데이터 프레임을 생성한다. 각각 고유식별 번호, 성별 + 지역, 출생일 이다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replac..