SparkMLlib构建机器学习回归模型
3.2 回归问题
3.2.1 线性回归
Spark MLlib 的线性回归算法是一种广泛使用的预测模型。它将输入特征映射到连续的输出值。这是通过训练模型来确定最佳拟合线性函数的系数完成的。
以下是一个使用 Spark MLlib 实现线性回归的 Java 示例程序。
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class LinearRegressionExample {
public static void main(String[] args) {
// 创建一个 SparkSession
SparkSession spark = SparkSession
.builder()
.appName("LinearRegressionExample")
.getOrCreate();
// 读取数据集
Dataset<Row> data = spark.read()
.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt");
// 将数据集拆分为训练集和测试集
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
// 将特征列合并到一个向量列中
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"features"})
.setOutputCol("featuresVector");
// 定义线性回归模型
LinearRegression lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);
// // 将数据集拟合到线性回归模型中
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
//训练模型
LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
//打印线性回归的系数和截距
System.out.println("系数Coefficients: "+lrModel.coefficients() + "");
System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
//总结训练集上的模型并打印出一些指标。
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
Dataset<Row> dataset = trainingSummary.predictions().select("prediction", "label", "featuresVector");
dataset.show(5);
spark.stop();
}
}
3.2.2 决策树回归
Spark MLlib 提供了决策树回归(Decision Tree Regression)算法来解决回归问题。决策树回归是一种基于树结构的非参数统计方法,能够处理多维输入和输出,并且具有良好的可解释性和鲁棒性。
下面是一个简单的 Java 代码示例,演示如何使用 Spark MLlib 的决策树回归算法对数据进行训练和预测:
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DecisionTreeRegressionExample {
public static void main(String[] args) {
// 创建 SparkSession
SparkSession spark = SparkSession.builder()
.appName("DecisionTreeRegressionExample")
.master("local[*]")
.getOrCreate();
// 读取数据集
Dataset<Row> data = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/your/data.csv");
// 定义特征列和标签列
String[] featureCols = data.columns();
String labelCol = "label";
// 将特征列转换为向量
VectorAssembler assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features");
Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);
// 将数据集拆分为训练集和测试集
double[] weights = {0.7, 0.3};
Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
Dataset<Row> trainData = datasets[0];
Dataset<Row> testData = datasets[1];
// 创建决策树回归器
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setLabelCol(labelCol)
.setFeaturesCol("features");
// 创建 Pipeline
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });
// 训练模型
PipelineModel model = pipeline.fit(trainData);
// 预测测试集
Dataset<Row> predictions = model.transform(testData);
// 显示预测结果
predictions.show();
// 获取训练好的决策树模型
DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());
// 停止 SparkSession
spark.stop();
}
}
这只是一个简单的示例,实际应用中可能需要更复杂的特征工程和模型调整。另外,如果数据集过大,可能需要在集群上运行以获得更好的性能。
3.2.3 随机森林回归
Spark MLlib提供了随机森林回归算法,可以用于预测连续的数值型数据。随机森林是一种集成学习算法,它基于决策树,通过随机选择样本和特征来减少过拟合的风险。随机森林回归使用多个决策树对数据进行拟合和预测,并取这些决策树的平均值作为最终预测结果。本文将介绍如何使用Spark MLlib中的随机森林回归算法,并提供一个完整可运行的Java示例。
示例说明: 在这个示例中,我们将使用Spark MLlib中的随机森林回归算法,对一组汽车数据进行建模,然后使用模型来预测汽车的燃油效率(MPG)。我们将使用UCI Machine Learning Repository中的Auto MPG数据集。该数据集包含8个输入特征,如汽车的气缸数、排量、马力、重量等,以及一个输出特征MPG,表示汽车的燃油效率。我们将使用70%的数据来训练模型,30%的数据用于测试模型性能。
1.准备数据 我们需要下载Auto MPG数据集,将其保存为CSV文件,并将其加载到Spark DataFrame中。
数据集下载地址:
CSV文件格式如下:
mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
18.0,8,307.0,130.0,3504.0,12.0,70,1
15.0,8,350.0,165.0,3693.0,11.5,70,1
其中,第一列为输出特征MPG,后面的列为输入特征。
2.构建随机森林回归模型
// 创建随机森林回归模型
RandomForestRegressor rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(10);
// 训练模型
RandomForestRegressionModel model = rf.fit(trainingData);
3.使用模型进行预测
// 使用模型进行预测
Dataset<Row> predictions = model.transform(testData);
4.评估模型性能
// 评估模型性能
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
下面是完整 Java 代码示例:
import org.apache.spark.SparkConf;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class RandomForestRegressionExample {
public static void main(String[] args) {
// Create a Spark session
SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
// Load data
Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
// Split the data into training and test sets
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Define the feature column names
String[] featureCols = new String[data.schema().fieldNames().length - 1];
for (int i = 0; i < featureCols.length; i++) {
featureCols[i] = "feature" + (i + 1);
}
// Assemble features into a vector
VectorAssembler assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features");
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
Dataset<Row> testDataWithFeatures = assembler.transform(testData);
// Create a RandomForestRegressor model
RandomForestRegressor rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxDepth(5)
.setNumTrees(20);
// Set up a pipeline
Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});
// Set up a grid of hyperparameters to search over using 3-fold cross validation
ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
.addGrid(rf.maxDepth(), new int[]{5, 10})
.addGrid(rf.numTrees(), new int[]{20, 50});
CrossValidator crossValidator = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator())
.setEstimatorParamMaps(paramGridBuilder.build())
.setNumFolds(3);
// Train the model using cross-validation
crossValidator.setSeed(12345);
CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);
// Evaluate the model on the test set
Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
// Stop the Spark session
spark.stop();
}
}
3.2.4 梯度提升回归树
Spark MLlib提供了梯度提升回归树(Gradient-Boosted Trees,GBT)的算法,它是一种强大的回归模型,可以用于连续的数值预测。GBT在每一次迭代中,使用决策树模型去拟合残差值,然后将所有的模型的预测结果相加,得到最终的预测结果。
以下是一个使用Spark MLlib进行梯度提升回归树的示例Java程序。这个程序将使用一个数据集,该数据集包含了关于自行车租赁量的信息。它将使用梯度提升回归树来预测一天中自行车的租赁量。
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.*;
public class GBTRegressionDemo {
public static void main(String[] args) {
// 创建 SparkSession
SparkSession spark = SparkSession.builder()
.appName("GradientBoostedTreeRegressionDemo")
.master("local[*]")
.getOrCreate();
// 读取数据集
Dataset<Row> data = spark.read().format("libsvm")
.load("data/sample_libsvm_data.txt");
// 将数据集划分为训练集和测试集
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 将特征向量合并为一个向量
VectorAssembler assembler = new VectorAssembler()
.setInputCols(trainingData.columns())
.setOutputCol("features");
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
Dataset<Row> testDataWithFeatures = assembler.transform(testData);
// 创建梯度提升回归树模型
GBTRegressor gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10);
// 训练模型
GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);
// 在测试集上进行预测
Dataset<Row> predictions = model.transform(testDataWithFeatures);
// 评估模型
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
// 输出模型的节点信息
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
// 关闭 SparkSession
spark.close();
}
}
该程序与之前的程序类似,只是将 DecisionTreeRegressor
类替换为 GBTRegressor
类。需要注意的是,GBTRegressor
类在设置参数时需要设置 maxIter
参数,表示最大迭代次数。同样需要用 setFeaturesCol
方法设置特征列,用 setLabelCol
方法设置标签列。最后需要调用 fit
方法训练模型,然后用 transform
方法在测试集上进行预测。最后用 RegressionEvaluator
类评估模型,并输出模型的节点信息。