pyspark 17

[PySpark] array 값 합계 컬럼 생성하기

PySpark의 Column() 함수를 사용하여 열에 있는 배열 값의 합계를 계산하려면 expr() 함수를 aggregate() 함수와 함께 사용할 수 있다. from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr, aggregate # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Create a sample DataFrame data = [("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])] df = spark.createDataFrame(data, ["Name", "Numbe..

[PySpark] 데이터프레임 값을 리스트로 반환하기

pyspark 데이터프레임의 특정 column의 value들을 리스트로 반환하는 방법이다. 여기서 x[2]는 cat2 column을 지칭하고 고유값이 아니라 전체 value를 list로 반환하려면 distinct() 없이 작업을 하면 된다. df2.show(3) #+---+----+----+--------+ #|ind| id|cat2|cat2_cnt| #+---+----+----+--------+ #| 1|2868| 167| 16| #| 2|1737| 157| 24| #| 3|1476| 189| 3| #+---+----+----+--------+ #only showing top 3 rows cat2_list = df2.rdd.map(lambda x: x[2]).distinct().collect() cat..

[PySpark] 빈 데이터 프레임 생성하고 데이터 집어넣기

pyspark에서 다음과 같이 테이블의 스키마를 지정하고 빈 데이터 테이블을 만들 수 있다. 스키마 지정시 뭐 int, float, double 등 데이터 유형을 지정할 수 있는데 임의로 string 데이터 형식을 지정해뒀다. spark 세션과 config는 본인의 환경에 맞게 설정하면 된다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() # conf.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0") conf.set("spark.driver.memory", "-g") conf.set("spark.exec..

[PySpark] 랜덤표본추출(sample, sampleBy, take_Sample)

Pyspark 데이터 프레임에서 랜덤 샘플링하는 방법을 찾아보다가 세가지 방법이 있길래 정리하려고 한다. 우선, 샘플링하고자 하는 데이터 프레임을 생성했다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replace").getOrCreate() address = [("RJFK-SLFKW-DFG1T","M BS", "1"), ("aedw-dg93r-d62g1","W SE", "2"), ("DFGE-FD23k-DA4G1", "M GJ", "3"), ("dssf-dg93r-d62g1","W SE", "4"), ("grdg-dg93r-d62g1","W SE", "5"), ("h..

[PySpark] SparkConf로 Spark 환경설정

Spark를 사용하기 이전에 SparkConf 객체를 사용해 Java system properties를 사용한다. 다같이 사용하는 분석서버에서 데이터 처리나 분석을 할 때 상황에 맞게 적절한 core수 제어나 메모리를 지정한다면 제한된 리소스로 효율적인 데이터 처리 및 분석이 가능하다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() conf.set("spark.driver.memory", "50g") conf.set("spark.executor.memory", "30g") conf.set("spark.ui.port","4051") spark = SparkSession.buil..

[PySpark] 백분위수(percentile), 사분위수(quantile)

약 1억 5천만건 데이터에서 특정한 값 기준으로 상위 n개의 데이터를 뽑아내는데 orderBy 후 limit(n)을 이용해 추출했다. 근데 너무 오래걸리더라... 값을 기준으로 정렬하는 연산방식이 spark에서 비효율적이라고 어디서 본거 같은데... 그래서 분위수 값을 구해 cut-off 방식으로 데이터를 추출했는데 시간이 더 적게 걸리더라. 우선, 예시를 들기 위해 다음과 같이 PySpark 세팅과 데이터 프레임을 구성했다. 그리고 visit(string 형식)의 값에 대해 분위수 값을 구하려고 한다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replace").ge..

[PySpark] regexp_replace 함수

PySpark 데이터 프레임에 있는 string value들을 다른값으로 바꾸거나 처리하는데 SQL string functions인 regexp_replace(), translate() 및 overlay()등을 사용할 수 있다. 그 중에서 PySpark SQL 함수인 regexp_replace() 사용하면 string column을 another string/substring column으로 생성할 수 있다. 예시를 보여주기 위해 우선, 데이터 프레임을 생성한다. 각각 고유식별 번호, 성별 + 지역, 출생일 이다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replac..