프로그래밍/PySpark

[PySpark] array 값 합계 컬럼 생성하기

히또아빠 2023. 6. 3. 20:00

 

 

PySpark의 Column() 함수를 사용하여 열에 있는 배열 값의 합계를 계산하려면 expr() 함수를 aggregate() 함수와 함께 사용할 수 있다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, aggregate

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])]
df = spark.createDataFrame(data, ["Name", "Numbers"])

# if error with pyspark.errors.exceptions.captured.AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve
df = df.withColumn("Numbers", expr("transform(Numbers, x -> cast(x as int))"))

# Compute the sum of array values and add a new column
df_with_sum = df.withColumn("Sum", expr("aggregate(Numbers, 0, (acc, x) -> acc + x)"))

# Show the resulting DataFrame
df_with_sum.show(truncate=False)

+-------+------------+---+
|Name   |Numbers     |Sum|
+-------+------------+---+
|Alice  |[1, 2, 3]   |6  |
|Bob    |[4, 5]      |9  |
|Charlie|[6, 7, 8, 9]|30 |
+-------+------------+---+

위의 예에서 "이름"과 "숫자"라는 두 개의 열이 있는 데이터 프레임을 생성했다.

expr("집계(숫자, 0, (acc, x) -> acc + x)")는 배열 값의 합계를 계산하는 데 사용했다.

aggregate() 함수에는 세가지 인자가 사용된다.

 

- 집계할 열(Numbers),

- 초기 값(0),

- 합 연산을 수행하는 람다 함수 ((acc, x) -> ac + x)


withColumn("Sum", expr("aggregate(숫자, 0, (acc, x)) -> acc + x) 메서드로 계산된 배열 값의 합으로 새 열 "Sum"을 데이터 프레임에 추가한다.

 

단, aggregate() 함수에서 데이터 유형 불일치로 error을 내뱉는 경우가 있다.

그럴 경우 aggregate() 이전에 transform() 함수로 array 내 배열 값을 정수(Int)로 변환 해주고 합계 열을 만들어 줘야 한다.

 

 

300x250
반응형