在PySpark上使用XGBoost

SAMshare

共 4463字,需浏览 9分钟

 ·

2021-04-26 09:25


我这里提供一个pyspark的版本,参考了大家公开的版本。同时因为官网没有查看特征重要性的方法,所以自己写了一个方法。本方法没有保存模型,相信大家应该会。


from pyspark.conf import SparkConffrom pyspark.sql import SparkSessionimport pyspark.sql.functions as Ffrom pyspark.sql.types import FloatType,DoubleType,StringType,IntegerTypefrom pyspark.ml import Pipeline,PipelineModelfrom xparkxgb import XGBoostClassifier,XGBoostRegressorimport loggingfrom datetime import date,timedaltafrom pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,MinAMaxScaler,IndexToStringconf = SparkConf()\    .setExecutorEnv('','123')spark = SparkSession \    .builder \    .config(conf=conf)\    .appName('pyspark demo')    .getOrCreate()sc = spark.sparkContext


🎳 拉取数据

df = spark.sql("select *  from test_table where datadate='20200101'")#删除不要的字段df = df.drop("column2")#选择字段-num_feature:数值,cat_feature:分类值num_features = ["num1","num2"]cat_features = ["cat1","cat2"]label_columns = ["is_true_flag"]df = df[num_features,cat_features+label_columns  ]df = df.dropna()df = df.na.replace('','NA')df = df.fillna(0)#change data typefor col in cat_features:    df = df.withColumn(col,df[col].cast(StringType()))    for col in num_features:    df = df.withColumn(col,df[col].cast(DoubleType())) df = df.withColumn('is_true_flag',df['ist_true_flag'].cast(IntegerType()))


🎳 转onehot

#one-hot & standard scaler  stages = []for col in cat_features:    # 字符串转成索引    string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')    # 转换为OneHot编码    encoder = OneHotEncoder(inputCol=string_index.getOutputCol(), outputCol=col + "_one_hot")    # 将每个字段的转换方式 放到stages中    stages += [string_index, encoder]
# 将income转换为索引label_string_index = StringIndexer(inputCol = 'is_true_flag', outputCol = 'label')# 添加到stages中stages += [label_string_index]
# 类别变量 + 数值变量assembler_cols = [c + "_one_hot" for c in cat_features] + num_featuresassembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")stages += [assembler]
# 使用pipeline完成数据处理pipeline = Pipeline(stages=stages)pipeline_model = pipeline.fit(df)df = pipeline_model.transform(df)train, test = df.randomSplit([0.7, 0.3], seed=2021)print(train.count())print(test.count())


🎳 创建模型

# 创建模型xgb = XGBoostClassifier(featuresCol = 'features', labelCol = 'label',predictionCol='predict_val',missing=0.0,numRound=50,numWorkers=10)preModel = xgb.fit(trainData)out1 = preModel.transform(testData)


🎳 查看训练效果

###训练效果##import pyspark.mllib.eveluation as evlr_results = out1.select(['predict_val','label']).rdd.map(lambda row:(row[0],row[1] * 1.0))lr_ev =ev.BinaryClassificationMetrics(lr_results)print (":Area under PR:{}".format(lr_ev.areaUnderPR))print (":Area under ROC:{}".format(lr_ev.areaUnderROC))tp = out1[(out.label == 1) & (out1.predict_val == 1)].count()tn = out1[(out.label == 0) & (out1.predict_val == 0)].count()fn = out1[(out.label == 0) & (out1.predict_val == 1)].count()fn = out1[(out.label == 1) & (out1.predict_val == 0)].count()print ('accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn))) #准确率print ('recall is : %f'%((tp)/(tp+fn))) #召回率print ('precision is : %f'%((tp)/(tp+fp))) #精确率


🎳 特征解析

#特征解析df.schema['features'].metadatatemp = df.schema["features"].metadata["ml_attr"]["attrs"]df_importance = pd.DataFrame(columns=['idx', 'name'])for attr in temp['numeric']:    temp_df = {}    temp_df['idx'] = attr['idx']    temp_df['name'] = attr['name']    #print(temp_df)    df_importance = df_importance.append(temp_df, ignore_index=True)    #print(attr['idx'], attr['name'])    #print(attr)    #breakdf_importancefor attr in temp['binary']:    temp_df = {}    temp_df['idx'] = attr['idx']    temp_df['name'] = attr['name']    df_importance = df_importance.append(temp_df, ignore_index=True)df_importance#解析特征重要值FeatureScoreMap = preModel.nativeBooster.getScore("","gain")file_path ="C://Users//Administrator//Desktop//importance.csv"file  = open(file_path,"w+")print(FeatureScoreMap ,file = file)file.close()f1 = open(file_path)line = f1.readline()data=line.replace(',','\n').replace('->',',').replace('Map(','').replace(')','').replace('f','')file  = open(file_path,"w+")print(data,file = file)file.close()df_temp = pd.read_csv(file_path,header=None,names=["feature","weight"])df_importance = df_importance.merge(df_temp, left_on="feature", right_on="feature")df_importance.sort_values(by=['feature_importance'], ascending=False, inplace=True)df_importance


浏览 24
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报