PySpark의 Column() 함수를 사용하여 열에 있는 배열 값의 합계를 계산하려면 expr() 함수를 aggregate() 함수와 함께 사용할 수 있다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, aggregate
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a sample DataFrame
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])]
df = spark.createDataFrame(data, ["Name", "Numbers"])
# if error with pyspark.errors.exceptions.captured.AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve
df = df.withColumn("Numbers", expr("transform(Numbers, x -> cast(x as int))"))
# Compute the sum of array values and add a new column
df_with_sum = df.withColumn("Sum", expr("aggregate(Numbers, 0, (acc, x) -> acc + x)"))
# Show the resulting DataFrame
df_with_sum.show(truncate=False)
+-------+------------+---+
|Name |Numbers |Sum|
+-------+------------+---+
|Alice |[1, 2, 3] |6 |
|Bob |[4, 5] |9 |
|Charlie|[6, 7, 8, 9]|30 |
+-------+------------+---+
위의 예에서 "이름"과 "숫자"라는 두 개의 열이 있는 데이터 프레임을 생성했다.
expr("집계(숫자, 0, (acc, x) -> acc + x)")는 배열 값의 합계를 계산하는 데 사용했다.
aggregate() 함수에는 세가지 인자가 사용된다.
- 집계할 열(Numbers),
- 초기 값(0),
- 합 연산을 수행하는 람다 함수 ((acc, x) -> ac + x)
withColumn("Sum", expr("aggregate(숫자, 0, (acc, x)) -> acc + x) 메서드로 계산된 배열 값의 합으로 새 열 "Sum"을 데이터 프레임에 추가한다.
단, aggregate() 함수에서 데이터 유형 불일치로 error을 내뱉는 경우가 있다.
그럴 경우 aggregate() 이전에 transform() 함수로 array 내 배열 값을 정수(Int)로 변환 해주고 합계 열을 만들어 줘야 한다.
300x250
반응형
'프로그래밍 > PySpark' 카테고리의 다른 글
[PySpark] explode, explode_outer 함수 차이 (0) | 2023.07.07 |
---|---|
[PySpark] dense 벡터와 sparse 벡터, UDF로 sparse vector 만들기 (0) | 2023.06.08 |
[PySpark] 데이터프레임 값을 리스트로 반환하기 (0) | 2023.02.10 |
[PySpark] 빈 데이터 프레임 생성하고 데이터 집어넣기 (0) | 2023.02.10 |
[PySpark] 랜덤표본추출(sample, sampleBy, take_Sample) (1) | 2023.02.03 |