在PySpark上使用XGBoost
我这里提供一个pyspark的版本,参考了大家公开的版本。同时因为官网没有查看特征重要性的方法,所以自己写了一个方法。本方法没有保存模型,相信大家应该会。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType,DoubleType,StringType,IntegerType
from pyspark.ml import Pipeline,PipelineModel
from xparkxgb import XGBoostClassifier,XGBoostRegressor
import logging
from datetime import date,timedalta
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,MinAMaxScaler,IndexToString
conf = SparkConf()\
.setExecutorEnv('','123')
spark = SparkSession \
.builder \
.config(conf=conf)\
.appName('pyspark demo')
.getOrCreate()
sc = spark.sparkContext
🎳 拉取数据
df = spark.sql("select * from test_table where datadate='20200101'")
#删除不要的字段
df = df.drop("column2")
#选择字段-num_feature:数值,cat_feature:分类值
num_features = ["num1","num2"]
cat_features = ["cat1","cat2"]
label_columns = ["is_true_flag"]
df = df[num_features,cat_features+label_columns ]
df = df.dropna()
df = df.na.replace('','NA')
df = df.fillna(0)
#change data type
for col in cat_features:
df = df.withColumn(col,df[col].cast(StringType()))
for col in num_features:
df = df.withColumn(col,df[col].cast(DoubleType()))
df = df.withColumn('is_true_flag',df['ist_true_flag'].cast(IntegerType()))
🎳 转onehot
#one-hot & standard scaler
stages = []
for col in cat_features:
# 字符串转成索引
string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
# 转换为OneHot编码
encoder = OneHotEncoder(inputCol=string_index.getOutputCol(), outputCol=col + "_one_hot")
# 将每个字段的转换方式 放到stages中
stages += [string_index, encoder]
# 将income转换为索引
label_string_index = StringIndexer(inputCol = 'is_true_flag', outputCol = 'label')
# 添加到stages中
stages += [label_string_index]
# 类别变量 + 数值变量
assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]
# 使用pipeline完成数据处理
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)
train, test = df.randomSplit([0.7, 0.3], seed=2021)
print(train.count())
print(test.count())
🎳 创建模型
# 创建模型
xgb = XGBoostClassifier(featuresCol = 'features', labelCol = 'label',predictionCol='predict_val',missing=0.0,numRound=50,numWorkers=10)
preModel = xgb.fit(trainData)
out1 = preModel.transform(testData)
🎳 查看训练效果
###训练效果##
import pyspark.mllib.eveluation as ev
lr_results = out1.select(['predict_val','label']).rdd.map(lambda row:(row[0],row[1] * 1.0))
lr_ev =ev.BinaryClassificationMetrics(lr_results)
print (":Area under PR:{}".format(lr_ev.areaUnderPR))
print (":Area under ROC:{}".format(lr_ev.areaUnderROC))
tp = out1[(out.label == 1) & (out1.predict_val == 1)].count()
tn = out1[(out.label == 0) & (out1.predict_val == 0)].count()
fn = out1[(out.label == 0) & (out1.predict_val == 1)].count()
fn = out1[(out.label == 1) & (out1.predict_val == 0)].count()
print ('accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn))) #准确率
print ('recall is : %f'%((tp)/(tp+fn))) #召回率
print ('precision is : %f'%((tp)/(tp+fp))) #精确率
🎳 特征解析
#特征解析
df.schema['features'].metadata
temp = df.schema["features"].metadata["ml_attr"]["attrs"]
df_importance = pd.DataFrame(columns=['idx', 'name'])
for attr in temp['numeric']:
temp_df = {}
temp_df['idx'] = attr['idx']
temp_df['name'] = attr['name']
#print(temp_df)
df_importance = df_importance.append(temp_df, ignore_index=True)
#print(attr['idx'], attr['name'])
#print(attr)
#break
df_importance
for attr in temp['binary']:
temp_df = {}
temp_df['idx'] = attr['idx']
temp_df['name'] = attr['name']
df_importance = df_importance.append(temp_df, ignore_index=True)
df_importance
#解析特征重要值
FeatureScoreMap = preModel.nativeBooster.getScore("","gain")
file_path ="C://Users//Administrator//Desktop//importance.csv"
file = open(file_path,"w+")
print(FeatureScoreMap ,file = file)
file.close()
f1 = open(file_path)
line = f1.readline()
data=line.replace(',','\n').replace('->',',').replace('Map(','').replace(')','').replace('f','')
file = open(file_path,"w+")
print(data,file = file)
file.close()
df_temp = pd.read_csv(file_path,header=None,names=["feature","weight"])
df_importance = df_importance.merge(df_temp, left_on="feature", right_on="feature")
df_importance.sort_values(by=['feature_importance'], ascending=False, inplace=True)
df_importance
评论