SparkMLlib构建机器学习回归模型

共 22720字,需浏览 46分钟

 ·

2024-04-11 14:53

3.2 回归问题

3.2.1 线性回归

Spark MLlib 的线性回归算法是一种广泛使用的预测模型。它将输入特征映射到连续的输出值。这是通过训练模型来确定最佳拟合线性函数的系数完成的。

以下是一个使用 Spark MLlib 实现线性回归的 Java 示例程序。

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class LinearRegressionExample {

    public static void main(String[] args) {
        // 创建一个 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("LinearRegressionExample")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read()
                .format("libsvm")
                .load("data/mllib/sample_linear_regression_data.txt");

        // 将数据集拆分为训练集和测试集
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];

        // 将特征列合并到一个向量列中
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(new String[]{"features"})
                .setOutputCol("featuresVector");

        // 定义线性回归模型
        LinearRegression lr = new LinearRegression()
                .setMaxIter(10)
                .setRegParam(0.3)
                .setElasticNetParam(0.8);

//        // 将数据集拟合到线性回归模型中
        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);

        //训练模型
        LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
        //打印线性回归的系数和截距
        System.out.println("系数Coefficients: "+lrModel.coefficients() + "");
        System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
        //总结训练集上的模型并打印出一些指标。
        LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
        Dataset<Row> dataset = trainingSummary.predictions().select("prediction""label""featuresVector");
        dataset.show(5);
        spark.stop();
    }
}
3.2.2 决策树回归

Spark MLlib 提供了决策树回归(Decision Tree Regression)算法来解决回归问题。决策树回归是一种基于树结构的非参数统计方法,能够处理多维输入和输出,并且具有良好的可解释性和鲁棒性。

下面是一个简单的 Java 代码示例,演示如何使用 Spark MLlib 的决策树回归算法对数据进行训练和预测:

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DecisionTreeRegressionExample {
    public static void main(String[] args) {
        // 创建 SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("DecisionTreeRegressionExample")
                .master("local[*]")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read().format("csv")
                .option("header""true")
                .option("inferSchema""true")
                .load("path/to/your/data.csv");

        // 定义特征列和标签列
        String[] featureCols = data.columns();
        String labelCol = "label";

        // 将特征列转换为向量
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(featureCols)
                .setOutputCol("features");

        Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);

        // 将数据集拆分为训练集和测试集
        double[] weights = {0.7, 0.3};
        Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
        Dataset<Row> trainData = datasets[0];
        Dataset<Row> testData = datasets[1];

        // 创建决策树回归器
        DecisionTreeRegressor dt = new DecisionTreeRegressor()
                .setLabelCol(labelCol)
                .setFeaturesCol("features");

        // 创建 Pipeline
        Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });

        // 训练模型
        PipelineModel model = pipeline.fit(trainData);

        // 预测测试集
        Dataset<Row> predictions = model.transform(testData);

        // 显示预测结果
        predictions.show();

        // 获取训练好的决策树模型
        DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
        System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());

        // 停止 SparkSession
        spark.stop();
    }
}

这只是一个简单的示例,实际应用中可能需要更复杂的特征工程和模型调整。另外,如果数据集过大,可能需要在集群上运行以获得更好的性能。

3.2.3 随机森林回归

Spark MLlib提供了随机森林回归算法,可以用于预测连续的数值型数据。随机森林是一种集成学习算法,它基于决策树,通过随机选择样本和特征来减少过拟合的风险。随机森林回归使用多个决策树对数据进行拟合和预测,并取这些决策树的平均值作为最终预测结果。本文将介绍如何使用Spark MLlib中的随机森林回归算法,并提供一个完整可运行的Java示例。

示例说明: 在这个示例中,我们将使用Spark MLlib中的随机森林回归算法,对一组汽车数据进行建模,然后使用模型来预测汽车的燃油效率(MPG)。我们将使用UCI Machine Learning Repository中的Auto MPG数据集。该数据集包含8个输入特征,如汽车的气缸数、排量、马力、重量等,以及一个输出特征MPG,表示汽车的燃油效率。我们将使用70%的数据来训练模型,30%的数据用于测试模型性能。

1.准备数据 我们需要下载Auto MPG数据集,将其保存为CSV文件,并将其加载到Spark DataFrame中。

数据集下载地址:

CSV文件格式如下:

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
18.0,8,307.0,130.0,3504.0,12.0,70,1
15.0,8,350.0,165.0,3693.0,11.5,70,1

其中,第一列为输出特征MPG,后面的列为输入特征。

2.构建随机森林回归模型

// 创建随机森林回归模型
RandomForestRegressor rf = new RandomForestRegressor()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setNumTrees(10);

// 训练模型
RandomForestRegressionModel model = rf.fit(trainingData);

3.使用模型进行预测

// 使用模型进行预测
Dataset<Row> predictions = model.transform(testData);

4.评估模型性能

// 评估模型性能
RegressionEvaluator evaluator = new RegressionEvaluator()
        .setLabelCol("label")
        .setPredictionCol("prediction")
        .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);

下面是完整 Java 代码示例:

import org.apache.spark.SparkConf;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class RandomForestRegressionExample {
    public static void main(String[] args) {

        // Create a Spark session
        SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        // Load data
        Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");

        // Split the data into training and test sets
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];
        Dataset<Row> testData = splits[1];

        // Define the feature column names
        String[] featureCols = new String[data.schema().fieldNames().length - 1];
        for (int i = 0; i < featureCols.length; i++) {
            featureCols[i] = "feature" + (i + 1);
        }

        // Assemble features into a vector
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(featureCols)
                .setOutputCol("features");

        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
        Dataset<Row> testDataWithFeatures = assembler.transform(testData);

        // Create a RandomForestRegressor model
        RandomForestRegressor rf = new RandomForestRegressor()
                .setLabelCol("label")
                .setFeaturesCol("features")
                .setMaxDepth(5)
                .setNumTrees(20);

        // Set up a pipeline
        Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});

        // Set up a grid of hyperparameters to search over using 3-fold cross validation
        ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
                .addGrid(rf.maxDepth(), new int[]{5, 10})
                .addGrid(rf.numTrees(), new int[]{20, 50});
        CrossValidator crossValidator = new CrossValidator()
                .setEstimator(pipeline)
                .setEvaluator(new RegressionEvaluator())
                .setEstimatorParamMaps(paramGridBuilder.build())
                .setNumFolds(3);

        // Train the model using cross-validation
        crossValidator.setSeed(12345);
        CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);

        // Evaluate the model on the test set
        Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
                .setMetricName("rmse");
        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

        // Stop the Spark session
        spark.stop();
    }
}
3.2.4 梯度提升回归树

Spark MLlib提供了梯度提升回归树(Gradient-Boosted Trees,GBT)的算法,它是一种强大的回归模型,可以用于连续的数值预测。GBT在每一次迭代中,使用决策树模型去拟合残差值,然后将所有的模型的预测结果相加,得到最终的预测结果。

以下是一个使用Spark MLlib进行梯度提升回归树的示例Java程序。这个程序将使用一个数据集,该数据集包含了关于自行车租赁量的信息。它将使用梯度提升回归树来预测一天中自行车的租赁量。

import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.*;

public class GBTRegressionDemo {
    public static void main(String[] args) {

        // 创建 SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("GradientBoostedTreeRegressionDemo")
                .master("local[*]")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read().format("libsvm")
                .load("data/sample_libsvm_data.txt");

        // 将数据集划分为训练集和测试集
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];
        Dataset<Row> testData = splits[1];

        // 将特征向量合并为一个向量
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(trainingData.columns())
                .setOutputCol("features");
        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
        Dataset<Row> testDataWithFeatures = assembler.transform(testData);

        // 创建梯度提升回归树模型
        GBTRegressor gbt = new GBTRegressor()
                .setLabelCol("label")
                .setFeaturesCol("features")
                .setMaxIter(10);

        // 训练模型
        GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);

        // 在测试集上进行预测
        Dataset<Row> predictions = model.transform(testDataWithFeatures);

        // 评估模型
        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
                .setMetricName("rmse");
        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

        // 输出模型的节点信息
        System.out.println("Learned regression GBT model:\n" + model.toDebugString());

        // 关闭 SparkSession
        spark.close();
    }
}

该程序与之前的程序类似,只是将 DecisionTreeRegressor 类替换为 GBTRegressor 类。需要注意的是,GBTRegressor 类在设置参数时需要设置 maxIter 参数,表示最大迭代次数。同样需要用 setFeaturesCol 方法设置特征列,用 setLabelCol 方法设置标签列。最后需要调用 fit 方法训练模型,然后用 transform 方法在测试集上进行预测。最后用 RegressionEvaluator 类评估模型,并输出模型的节点信息。


浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报