프로그래밍/PySpark

[PySpark] 백분위수(percentile), 사분위수(quantile)

히또아빠 2023. 1. 25. 15:05

약 1억 5천만건 데이터에서 특정한 값 기준으로 상위 n개의 데이터를 뽑아내는데 orderBy 후 limit(n)을 이용해 추출했다.

근데 너무 오래걸리더라... 값을 기준으로 정렬하는 연산방식이 spark에서 비효율적이라고 어디서 본거 같은데...

 

그래서 분위수 값을 구해 cut-off 방식으로 데이터를 추출했는데 시간이 더 적게 걸리더라. 

 

우선, 예시를 들기 위해 다음과 같이 PySpark 세팅과 데이터 프레임을 구성했다. 그리고 visit(string 형식)의 값에 대해 분위수 값을 구하려고 한다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("regexp_replace").getOrCreate()
address = [("RJFK-SLFKW-DFG1T","M BS", "1"),
    ("aedw-dg93r-d62g1","W SE", "2"),
    ("DFGE-FD23k-DA4G1", "M GJ", "3"),
    ("dssf-dg93r-d62g1","W SE", "4"),
    ("grdg-dg93r-d62g1","W SE", "5"),
    ("hrth-dg93r-d62g1","W SE", "6"),
    ("hjtr-dg93r-d62g1","W SE", "7"),
    ("rnhe-dg93r-d62g1","W SE", "8"),
    ("snbf-dg93r-d62g1","W SE", "9"),
    ("shlr-dg93r-d62g1","W SE", "10"),]
df =spark.createDataFrame(address,["id","demo","visit"])
df.show()

df.printSchema()

#+----------------+----+-----+
#|              id|demo|visit|
#+----------------+----+-----+
#|RJFK-SLFKW-DFG1T|M BS|    1|
#|aedw-dg93r-d62g1|W SE|    2|
#|DFGE-FD23k-DA4G1|M GJ|    3|
#|dssf-dg93r-d62g1|W SE|    4|
#|grdg-dg93r-d62g1|W SE|    5|
#|hrth-dg93r-d62g1|W SE|    6|
#|hjtr-dg93r-d62g1|W SE|    7|
#|rnhe-dg93r-d62g1|W SE|    8|
#|snbf-dg93r-d62g1|W SE|    9|
#|shlr-dg93r-d62g1|W SE|   10|
#+----------------+----+-----+

#root
# |-- id: string (nullable = true)
# |-- demo: string (nullable = true)
# |-- visit: string (nullable = true)
import time
import pyspark.sql.functions as F

1.사분위수(Quantile)

전체 데이터를 4등분 하는 수,

 제 1 사분위수(Q1) - 25 백분위수

제 2 사분위수(Q2) - 50 백분위수

제 3 사분위수(Q3) - 75 백분위수

 

*사분위수 범위: Q3 - Q1

start = time.time()

quantile_70 = df.select(F.col("visit").cast('float')).approxQuantile("visit", [0.7], 0.001)
print(quantile_70[0])
df.filter(F.col("visit") > quantile_70[0]).show()

end = time.time()
print(f"{end - start:.5f} sec")

#7.0
#+----------------+----+-----+
#|              id|demo|visit|
#+----------------+----+-----+
#|rnhe-dg93r-d62g1|W SE|    8|
#|snbf-dg93r-d62g1|W SE|    9|
#|shlr-dg93r-d62g1|W SE|   10|
#+----------------+----+-----+

#0.29151 sec

PySpark 데이터 프레임 내장함수 'approxQuantile' 함수 있음. 근데, string 형식에서 바로 적용 안돼서 cast 함수를 이용해 'float' or 'double'로 변형시켜 사용했음.

함수의 세번째 인자는 approxQuantile 함수가 러프하게 Qunatile을 계산해 산출하기 때문에 오차범위를 지정해줌.

percentile(사분위수)함수에 비해 빠른 연산으로 이거 사용하는게 나을듯.

 

 

2.백분위수(percentile)

전체를 100으로 보고 작은 수 부터  큰 수 나열했을때 n 번째 오는 수, 상대적인 위치.

0백분위: 최소값

100백분위: 최대값

50백분위: 중앙값

start = time.time()

quan_70 = df.agg(F.expr('percentile(visit, 0.7)')).collect()
print(quan_70[0][0])
df.filter(F.col("visit") > quan_70[0][0]).show()

end = time.time()
print(f"{end - start:.5f} sec")

#7.3
#+----------------+----+-----+
#|              id|demo|visit|
#+----------------+----+-----+
#|rnhe-dg93r-d62g1|W SE|    8|
#|snbf-dg93r-d62g1|W SE|    9|
#|shlr-dg93r-d62g1|W SE|   10|
#+----------------+----+-----+

#0.33991 sec
start = time.time()

quan_70 = df.agg(F.expr('approx_percentile(visit, 0.7)')).collect()
print(quan_70[0][0])
df.filter(F.col("visit") > quan_70[0][0]).show()

end = time.time()
print(f"{end - start:.5f} sec")

#7.0
#+----------------+----+-----+
#|              id|demo|visit|
#+----------------+----+-----+
#|rnhe-dg93r-d62g1|W SE|    8|
#|snbf-dg93r-d62g1|W SE|    9|
#|shlr-dg93r-d62g1|W SE|   10|
#+----------------+----+-----+

#0.32838 sec

백분위수(percentile) 함수는 spark 데이터 프레임 내장 함수가 따로 없고 sql 함수를 사용하는 듯. 사분위수(quantile) 함수를 사용하는게 더 빨라 보임.

전체를 100으로 보고 계산하기 때문에 percentile은 상위 30% 값인 7을 출력하는게 아니라 7.3을 출력하는듯. approx_percentile 함수는 approx_Quantile 함수랑 동일한 값 출력. 

그리고 Quntile 함수랑은 다르게 string에 그대로 적용되긴 함.

 

결론, quantile cut-off로 잘라서 orderby하면 더 빠르긴 할 듯 전체 데이터 대상으로 sort해서 limit 하는거 보다는.

300x250
반응형