우선, 모델의 학습에 필요한 train, test 셋이 있다는 가정하에 다음과 같이 3차 교호법(3 fold crossvaildaition)을 통해 로지스틱 모형을 학습하고 test 데이터 셋에 스코어링 하는 코드이다.
pyspark에서 로지스틱 모형의 파라미터 옵션은
https://runawayhorse001.github.io/LearningApacheSpark/reg.html
여기서 확인가능하다.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator
# 모델선언
lr = LogisticRegression(labelCol = "label", featuresCol = "features")
# 파라미터 그리드 지정
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 1.0])\
.addGrid(lr.elasticNetParam, [0.1, 0.5, 0.9])\
.build()
# metricName: areaUnderROC, areaUnderPR 2가지 지원
evaluator = BinaryClassificationEvaluator(labelCol = "label", metricName = "areaUnderPR")
# 5폴드 교차검증
cv = CrossValidator(
estimator = lr,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
numFolds = 3
)
# 모델학습 및 평가
model = cv.fit(train)
model_prediction = model.transform(test)
학습된 모델을 저장하고 다시 불러올때는 LogisticRegressionModel 패키지를 import 해야한다.
from pyspark.ml.classification import LogisticRegressionModel
# Save the model to a specified path
model.save("hdfs://dsdspark/user/dsd/hshwang/model/logistic_test1")
# Assuming model is your trained logistic regression model
loaded_logi = LogisticRegressionModel.load("hdfs://dsdspark/user/dsd/hshwang/model/logistic_test1/bestModel") # Load the model
그리고 crossvalidaion 결과 지정된 metric 기준 어떤 파라미터에서 bestmodel 인지는 다음 코드로 확인할 수 있다.
# cv 결과 확인 - train에서 어떤 모델 선택 됐는지
import pandas as pd
params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
{model.getEvaluator().getMetricName(): metric, **ps}
for ps, metric in zip(params, model.avgMetrics)
]).sort_values(by = "areaUnderPR", ascending = False)
그리고 학습된 로지스틱 모형에서 어떤 변수에 어떤 계수가 추정됐는지를 확인하는 코드는 아래와 같다.
# Get the coefficients of the model
coefficients = loaded_logi.coefficients
feature_index = [i for i in range(len(coefficients))]
# Create a dictionary to associate feature names with their coefficients
feature_importance = [(feature_index[i], float(coefficients[i])) for i in range(len(coefficients))]
schema = StructType([
StructField("index", IntegerType(), True),
StructField("coefficient", DoubleType(), True)
])
feature_importance_df = spark.createDataFrame(feature_importance, schema)
300x250
반응형
'프로그래밍 > PySpark' 카테고리의 다른 글
[PySpark] round(반올림), ceil(올림), floor(내림) 함수로 소수점 자리까지 다루기 (2) | 2023.09.15 |
---|---|
[PySpark] 특정(여러) 문자열(strings)이 포함된 데이터 필터로 뽑아내기. (0) | 2023.08.31 |
[PySpark] array_intersect로 array간 같은 value값 찾기 (0) | 2023.08.21 |
[PySpark] Union(= unionAll) 함수로 두 데이터 프레임 합치기 (0) | 2023.07.11 |
[PySpark] explode, explode_outer 함수 차이 (0) | 2023.07.07 |