프로그래밍/PySpark 15

[PySpark] 오버샘플링(oversampling), 언더샘플링(undersampling)

기계 학습에서 불균형 훈련 데이터 세트로 분류 문제를 다룰 때 오버 샘플링과 언더 샘플링은 결과를 개선하는 두 가지 쉽고 종종 효과적인 방법이다. 1.불균형(imbalanced) 데이터란? 데이터셋의 한 클래스(레이블) 수가 다른 클래스보다 상당히 많거나 적을때 데이터의 클래스가 불균형 상태라고 말한다. '불균형'이라는것이 상당히 분석가나 과학자의 입장에서 주관적인 판단이 개입된다. 그리고 실제 문제를 ML(머신러닝, 기계학습)으로 해결할 때 깔끔하게 균형잡힌 데이터를 만나기는 힘들다. (예를들어, 이상치 탐지, 사기탐지 데이터, 질병 데이터 등) 머신러닝 모델은 맹목적으로 다수의 클래스를 대부분 학습하기 때문에 대다수 예측 데이터를 학습된 다수의 클래스로 예측하려는 경향이 있다. 그렇기 때문에, 소수 ..

[PySpark] round(반올림), ceil(올림), floor(내림) 함수로 소수점 자리까지 다루기

PySpark를 사용하여 DataFrame의 특정 컬럼을 반올림, 올림 또는 내림하는 방법은 round(), ceil(), floor() 함수를 사용한다. PySpark의 ceil 및 floor 함수는 기본적으로 특정 소수 자릿수에서 동작하지 않는다. 특정 소수 자릿수에서 올림 또는 내림을 수행하려면 먼저 원하는 소수 자릿수로 값을 곱한 다음, 올림 또는 내림한 후에 다시 나누는 방법을 사용해야 한다. 즉, 원하는 자릿수 만큼 곱해서 올림이나 내린 다음에 다시 자릿수 만큼 나눠주는 작업이 필요하다. 0.참고사항 from pyspark.sql.functions import expr # DataFrame에서 "value" 컬럼을 소수점 둘째 자리로 올림 df = df.withColumn("ceiled_val..

[PySpark] 특정(여러) 문자열(strings)이 포함된 데이터 필터로 뽑아내기.

우선 데이터 프레임 내에서 한 string 스키마 구조의 컬럼에서 특정 문자열이 포함된 데이터를 filter로 걸러내는 코드는 아래와 같다. 아래에서는 search strings list에 있는 단어들을 포함하고 있는 데이터를 뽑아내는 방법이다. 여러 단어들이 아닌 한 단어를 포함하고 있는 데이터를 뽑고자 할때는 reduce 함수를 쓸 필요 없이 아래와 같이 pyspark 내장함수를 활용하면 된다. df.filter(col("col1").contains("기내용")) - 기내용이 포함된 데이터 산출 df.filter(~col("col1").contains("기내용")) - 기내용이 포함되지 않은 데이터 산출 from pyspark.sql import SparkSession from pyspark.sql...

[PySpark] 학습된 로지스틱 모형의 계수 확인하기.

우선, 모델의 학습에 필요한 train, test 셋이 있다는 가정하에 다음과 같이 3차 교호법(3 fold crossvaildaition)을 통해 로지스틱 모형을 학습하고 test 데이터 셋에 스코어링 하는 코드이다. pyspark에서 로지스틱 모형의 파라미터 옵션은 https://runawayhorse001.github.io/LearningApacheSpark/reg.html 10. Regularization — Learning Apache Spark with Python documentation 10. Regularization In mathematics, statistics, and computer science, particularly in the fields of machine learning..

[PySpark] array_intersect로 array간 같은 value값 찾기

다음과 같은 'data'라는 데이터 프레임이 있다. 그리고 데이터 프레임에 collect_list(site_app)이라는 array 구조의 컬럼이 있는데 해당 컬럼에서 list_A의 list value 값이 몇개가 있는지 궁금하다. data = [("oaid_1", ["app1", "app2", "app3"], [10, 20, 30], [1, 2, 3]), ("oaid_2", ["app2", "app4", "app5"], [15, 25, 35], [2, 4, 5]), ("oaid_3", ["app1", "app3", "app4"], [12, 22, 32], [1, 3, 4])] # Create a DataFrame with the sample data columns = ["oaid", "collect_l..

[PySpark] Union(= unionAll) 함수로 두 데이터 프레임 합치기

PySpark union() 및 unionAll() 함수는 동일한 스키마 또는 구조의 둘 이상의 DataFrame을 병합하는 데 사용된다. 그러면 union 과 unionAll 함수의 차이는 뭘 까? 결론적으로 말하자면 두 함수의 차이는 없다. Spark 2.0.0 버전부터는 unionAll()은 사용되지 않고 union으로 사용된다. 참고사항으로 SQL 언어에서 Union은 중복을 제거하지만 UnionAll은 중복 레코드를 포함한 두 개의 데이터 세트를 병합한다. PySpark에서는 둘 다 동일하게 동작하며 DataFrame 중복을 제거하기 위해선 duplicate() 함수를 사용하는 것이 좋다. import pyspark from pyspark.sql import SparkSession spark =..

[PySpark] explode, explode_outer 함수 차이

다음과 같은 스키마를 가진 테이블을 생성했다. an_array 라는 array 컬럼 하나와 a_map 으로 묶어진 key - value 컬럼이 존재한다 df = spark.createDataFrame( [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ("id", "an_array", "a_map") ) df.show() +---+----------+----------+ | id| an_array| a_map| +---+----------+----------+ | 1|[foo, bar]|{x -> 1.0}| | 2| []| {}| | 3| null| null| +---+----------+----------+ df.printSchema()..

[PySpark] dense 벡터와 sparse 벡터, UDF로 sparse vector 만들기

1.vector 개념 희소 벡터를 생성하려면 벡터 길이(엄격하게 증가해야 하는 0이 아닌 값과 0이 아닌 값의 인덱스)를 제공해야 합니다. pyspark.mllib.linag.Vecotors 라이브러리는 dense(고밀도), sparse(희소) 두 유형의 로컬 벡터를 지원한다. 희소 벡터는 벡터안에 숫자가 0이 많은 경우에 사용한다. 희소 벡터를 생성하려면 벡터 길이와 0이 아닌 값과 0이 아닌 값의 인덱스 총 세가지의 인자를 넣어야 한다. tip! 일반적으로 ML 모델 feature 구성시 vector assembler를 활용해서 features를 하나의 벡터로 구성한다. Vector Assembler가 메모리를 적게 사용하는 형식 중 하나를 기준으로 dense 벡터와 sparse 벡터를 선택한다. 또..

[PySpark] array 값 합계 컬럼 생성하기

PySpark의 Column() 함수를 사용하여 열에 있는 배열 값의 합계를 계산하려면 expr() 함수를 aggregate() 함수와 함께 사용할 수 있다. from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr, aggregate # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Create a sample DataFrame data = [("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])] df = spark.createDataFrame(data, ["Name", "Numbe..

[PySpark] 데이터프레임 값을 리스트로 반환하기

pyspark 데이터프레임의 특정 column의 value들을 리스트로 반환하는 방법이다. 여기서 x[2]는 cat2 column을 지칭하고 고유값이 아니라 전체 value를 list로 반환하려면 distinct() 없이 작업을 하면 된다. df2.show(3) #+---+----+----+--------+ #|ind| id|cat2|cat2_cnt| #+---+----+----+--------+ #| 1|2868| 167| 16| #| 2|1737| 157| 24| #| 3|1476| 189| 3| #+---+----+----+--------+ #only showing top 3 rows cat2_list = df2.rdd.map(lambda x: x[2]).distinct().collect() cat..