프로그래밍/PySpark

[PySpark] dense 벡터와 sparse 벡터, UDF로 sparse vector 만들기

히또아빠 2023. 6. 8. 17:36

1.vector 개념

희소 벡터를 생성하려면 벡터 길이(엄격하게 증가해야 하는 0이 아닌 값과 0이 아닌 값의 인덱스)를 제공해야 합니다. pyspark.mllib.linag.Vecotors 라이브러리는 dense(고밀도), sparse(희소) 두 유형의 로컬 벡터를 지원한다.
희소 벡터는 벡터안에 숫자가 0이 많은 경우에 사용한다. 희소 벡터를 생성하려면 벡터 길이와 0이 아닌 값과 0이 아닌 값의 인덱스 총 세가지의 인자를 넣어야 한다.

tip!

일반적으로 ML 모델 feature 구성시 vector assembler를 활용해서 features를 하나의 벡터로 구성한다.

Vector Assembler가 메모리를 적게 사용하는 형식 중 하나를 기준으로 dense 벡터와 sparse 벡터를 선택한다.
또한, SparseVector에서 DenseVector로 변환하는데는 UDF가 필요하지 않아 Array() 메서드로 사용하면 된다.
데이터 전처리시 StandardScaler는 Withmean=True로 설정하지 않는 한 SparseVector를 허용한다.

2.sparse vector UDF example

그러면 sparse vector에서 0이 아닌 값과, 인덱스를 각각의 array 컬럼으로 가지고 있을때 sparse 벡터를 만드는 예시이다.
0이 많은 데이터의 경우 ML feature 사용시 sparse vector가 메모리상 효율적이다. spark dataframe은 index를 지원하지 않기 때문에 feature의 활용을 위해서 임의의 index를 생성해야한다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import array , udf
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import VectorUDT, StringType, ArrayType, IntegerType


# Define the schema
schema = StructType([
    StructField("id", StringType()),
    StructField("f_array", ArrayType(IntegerType())),
    StructField("c_array", ArrayType(IntegerType()),),
    StructField("len", IntegerType())
])

# Define the data
data = [
    ("A", (11, 16, 19), (43, 16, 4), 20),
    ("C", (1, 2, 3), (1, 73, 26), 20),
    ("B", (1, 2), (2, 1), 20)
]

df = spark.createDataFrame(data, schema)

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- f_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- c_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- len: integer (nullable = true)

df.show()
+---+------------+-----------+---+
| id|     f_array|    c_array|len|
+---+------------+-----------+---+
|  A|[11, 16, 19]|[43, 16, 4]| 20|
|  C|   [1, 2, 3]|[1, 73, 26]| 20|
|  B|      [1, 2]|     [2, 1]| 20|
+---+------------+-----------+---+

def create_sparse_vector(f_array, c_array):
    size = 20
    return Vectors.sparse(size, f_array, c_array)

sparse_vector_udf = udf(create_sparse_vector, returnType = VectorUDT())

# Create a new column with the sparse vectors
df = df.withColumn("sparse_vector", sparse_vector_udf("f_array", "c_array"))

# Show the result
df.show(truncate=False)

+---+------------+-----------+---+-------------------------------+
|id |f_array     |c_array    |len|sparse_vector                  |
+---+------------+-----------+---+-------------------------------+
|A  |[11, 16, 19]|[43, 16, 4]|20 |(20,[11,16,19],[43.0,16.0,4.0])|
|C  |[1, 2, 3]   |[1, 73, 26]|20 |(20,[1,2,3],[1.0,73.0,26.0])   |
|B  |[1, 2]      |[2, 1]     |20 |(20,[1,2],[2.0,1.0])           |
+---+------------+-----------+---+-------------------------------+

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- f_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- c_array: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- len: integer (nullable = true)
 |-- sparse_vector: vector (nullable = true)
300x250
반응형