다음과 같은 'data'라는 데이터 프레임이 있다. 그리고 데이터 프레임에 collect_list(site_app)이라는 array 구조의 컬럼이 있는데 해당 컬럼에서 list_A의 list value 값이 몇개가 있는지 궁금하다.
data = [("oaid_1", ["app1", "app2", "app3"], [10, 20, 30], [1, 2, 3]),
("oaid_2", ["app2", "app4", "app5"], [15, 25, 35], [2, 4, 5]),
("oaid_3", ["app1", "app3", "app4"], [12, 22, 32], [1, 3, 4])]
# Create a DataFrame with the sample data
columns = ["oaid", "collect_list(site_app)", "collect_list(log_cnt)", "collect_set(day)"]
df = spark.createDataFrame(data, columns)
# List 'A' for comparison
list_A = ["app1", "app3"]
df.printSchema()
root
|-- oaid: string (nullable = true)
|-- collect_list(site_app): array (nullable = true)
| |-- element: string (containsNull = true)
|-- collect_list(log_cnt): array (nullable = true)
| |-- element: long (containsNull = true)
|-- collect_set(day): array (nullable = true)
| |-- element: long (containsNull = true)
우선, list_A를 'lit', 'array' 함수를 활용하여 array형태로 바꿔줘야 한다. 안그러면 pyspark의 type 관련 error가 난다. 바꾼후 array_intersect를 활용하여 같은 값을 뽑아내고 size 계산을 한다.
list_A_array = array([lit(item) for item in list_A])
# Calculate the size of the intersection between collect_list(site_app) and list 'A'
result_df = df.withColumn("common_app_count", size(array_intersect(df["collect_list(site_app)"], list_A_array)))
result_df.show()
+------+----------------------+---------------------+----------------+----------------+
| oaid|collect_list(site_app)|collect_list(log_cnt)|collect_set(day)|common_app_count|
+------+----------------------+---------------------+----------------+----------------+
|oaid_1| [app1, app2, app3]| [10, 20, 30]| [1, 2, 3]| 2|
|oaid_2| [app2, app4, app5]| [15, 25, 35]| [2, 4, 5]| 0|
|oaid_3| [app1, app3, app4]| [12, 22, 32]| [1, 3, 4]| 2|
+------+----------------------+---------------------+----------------+----------------+
300x250
반응형
'프로그래밍 > PySpark' 카테고리의 다른 글
[PySpark] 특정(여러) 문자열(strings)이 포함된 데이터 필터로 뽑아내기. (0) | 2023.08.31 |
---|---|
[PySpark] 학습된 로지스틱 모형의 계수 확인하기. (0) | 2023.08.25 |
[PySpark] Union(= unionAll) 함수로 두 데이터 프레임 합치기 (0) | 2023.07.11 |
[PySpark] explode, explode_outer 함수 차이 (0) | 2023.07.07 |
[PySpark] dense 벡터와 sparse 벡터, UDF로 sparse vector 만들기 (0) | 2023.06.08 |