SparkMLlib构建机器学习回归模型

Python之王

共 22720字,需浏览 46分钟

 · 2024-04-11

3.2 回归问题

3.2.1 线性回归

Spark MLlib 的线性回归算法是一种广泛使用的预测模型。它将输入特征映射到连续的输出值。这是通过训练模型来确定最佳拟合线性函数的系数完成的。

以下是一个使用 Spark MLlib 实现线性回归的 Java 示例程序。

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class LinearRegressionExample {

    public static void main(String[] args) {
        // 创建一个 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("LinearRegressionExample")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read()
                .format("libsvm")
                .load("data/mllib/sample_linear_regression_data.txt");

        // 将数据集拆分为训练集和测试集
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];

        // 将特征列合并到一个向量列中
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(new String[]{"features"})
                .setOutputCol("featuresVector");

        // 定义线性回归模型
        LinearRegression lr = new LinearRegression()
                .setMaxIter(10)
                .setRegParam(0.3)
                .setElasticNetParam(0.8);

//        // 将数据集拟合到线性回归模型中
        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);

        //训练模型
        LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
        //打印线性回归的系数和截距
        System.out.println("系数Coefficients: "+lrModel.coefficients() + "");
        System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
        //总结训练集上的模型并打印出一些指标。
        LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
        Dataset<Row> dataset = trainingSummary.predictions().select("prediction""label""featuresVector");
        dataset.show(5);
        spark.stop();
    }
}
3.2.2 决策树回归

Spark MLlib 提供了决策树回归(Decision Tree Regression)算法来解决回归问题。决策树回归是一种基于树结构的非参数统计方法,能够处理多维输入和输出,并且具有良好的可解释性和鲁棒性。

下面是一个简单的 Java 代码示例,演示如何使用 Spark MLlib 的决策树回归算法对数据进行训练和预测:

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DecisionTreeRegressionExample {
    public static void main(String[] args) {
        // 创建 SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("DecisionTreeRegressionExample")
                .master("local[*]")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read().format("csv")
                .option("header""true")
                .option("inferSchema""true")
                .load("path/to/your/data.csv");

        // 定义特征列和标签列
        String[] featureCols = data.columns();
        String labelCol = "label";

        // 将特征列转换为向量
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(featureCols)
                .setOutputCol("features");

        Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);

        // 将数据集拆分为训练集和测试集
        double[] weights = {0.7, 0.3};
        Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
        Dataset<Row> trainData = datasets[0];
        Dataset<Row> testData = datasets[1];

        // 创建决策树回归器
        DecisionTreeRegressor dt = new DecisionTreeRegressor()
                .setLabelCol(labelCol)
                .setFeaturesCol("features");

        // 创建 Pipeline
        Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });

        // 训练模型
        PipelineModel model = pipeline.fit(trainData);

        // 预测测试集
        Dataset<Row> predictions = model.transform(testData);

        // 显示预测结果
        predictions.show();

        // 获取训练好的决策树模型
        DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
        System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());

        // 停止 SparkSession
        spark.stop();
    }
}

这只是一个简单的示例,实际应用中可能需要更复杂的特征工程和模型调整。另外,如果数据集过大,可能需要在集群上运行以获得更好的性能。

3.2.3 随机森林回归

Spark MLlib提供了随机森林回归算法,可以用于预测连续的数值型数据。随机森林是一种集成学习算法,它基于决策树,通过随机选择样本和特征来减少过拟合的风险。随机森林回归使用多个决策树对数据进行拟合和预测,并取这些决策树的平均值作为最终预测结果。本文将介绍如何使用Spark MLlib中的随机森林回归算法,并提供一个完整可运行的Java示例。

示例说明: 在这个示例中,我们将使用Spark MLlib中的随机森林回归算法,对一组汽车数据进行建模,然后使用模型来预测汽车的燃油效率(MPG)。我们将使用UCI Machine Learning Repository中的Auto MPG数据集。该数据集包含8个输入特征,如汽车的气缸数、排量、马力、重量等,以及一个输出特征MPG,表示汽车的燃油效率。我们将使用70%的数据来训练模型,30%的数据用于测试模型性能。

1.准备数据 我们需要下载Auto MPG数据集,将其保存为CSV文件,并将其加载到Spark DataFrame中。

数据集下载地址:

CSV文件格式如下:

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
18.0,8,307.0,130.0,3504.0,12.0,70,1
15.0,8,350.0,165.0,3693.0,11.5,70,1

其中,第一列为输出特征MPG,后面的列为输入特征。

2.构建随机森林回归模型

// 创建随机森林回归模型
RandomForestRegressor rf = new RandomForestRegressor()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setNumTrees(10);

// 训练模型
RandomForestRegressionModel model = rf.fit(trainingData);

3.使用模型进行预测

// 使用模型进行预测
Dataset<Row> predictions = model.transform(testData);

4.评估模型性能

// 评估模型性能
RegressionEvaluator evaluator = new RegressionEvaluator()
        .setLabelCol("label")
        .setPredictionCol("prediction")
        .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);

下面是完整 Java 代码示例:

import org.apache.spark.SparkConf;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class RandomForestRegressionExample {
    public static void main(String[] args) {

        // Create a Spark session
        SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        // Load data
        Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");

        // Split the data into training and test sets
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];
        Dataset<Row> testData = splits[1];

        // Define the feature column names
        String[] featureCols = new String[data.schema().fieldNames().length - 1];
        for (int i = 0; i < featureCols.length; i++) {
            featureCols[i] = "feature" + (i + 1);
        }

        // Assemble features into a vector
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(featureCols)
                .setOutputCol("features");

        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
        Dataset<Row> testDataWithFeatures = assembler.transform(testData);

        // Create a RandomForestRegressor model
        RandomForestRegressor rf = new RandomForestRegressor()
                .setLabelCol("label")
                .setFeaturesCol("features")
                .setMaxDepth(5)
                .setNumTrees(20);

        // Set up a pipeline
        Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});

        // Set up a grid of hyperparameters to search over using 3-fold cross validation
        ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
                .addGrid(rf.maxDepth(), new int[]{5, 10})
                .addGrid(rf.numTrees(), new int[]{20, 50});
        CrossValidator crossValidator = new CrossValidator()
                .setEstimator(pipeline)
                .setEvaluator(new RegressionEvaluator())
                .setEstimatorParamMaps(paramGridBuilder.build())
                .setNumFolds(3);

        // Train the model using cross-validation
        crossValidator.setSeed(12345);
        CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);

        // Evaluate the model on the test set
        Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
                .setMetricName("rmse");
        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

        // Stop the Spark session
        spark.stop();
    }
}
3.2.4 梯度提升回归树

Spark MLlib提供了梯度提升回归树(Gradient-Boosted Trees,GBT)的算法,它是一种强大的回归模型,可以用于连续的数值预测。GBT在每一次迭代中,使用决策树模型去拟合残差值,然后将所有的模型的预测结果相加,得到最终的预测结果。

以下是一个使用Spark MLlib进行梯度提升回归树的示例Java程序。这个程序将使用一个数据集,该数据集包含了关于自行车租赁量的信息。它将使用梯度提升回归树来预测一天中自行车的租赁量。

import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.*;

public class GBTRegressionDemo {
    public static void main(String[] args) {

        // 创建 SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("GradientBoostedTreeRegressionDemo")
                .master("local[*]")
                .getOrCreate();

        // 读取数据集
        Dataset<Row> data = spark.read().format("libsvm")
                .load("data/sample_libsvm_data.txt");

        // 将数据集划分为训练集和测试集
        Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
        Dataset<Row> trainingData = splits[0];
        Dataset<Row> testData = splits[1];

        // 将特征向量合并为一个向量
        VectorAssembler assembler = new VectorAssembler()
                .setInputCols(trainingData.columns())
                .setOutputCol("features");
        Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
        Dataset<Row> testDataWithFeatures = assembler.transform(testData);

        // 创建梯度提升回归树模型
        GBTRegressor gbt = new GBTRegressor()
                .setLabelCol("label")
                .setFeaturesCol("features")
                .setMaxIter(10);

        // 训练模型
        GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);

        // 在测试集上进行预测
        Dataset<Row> predictions = model.transform(testDataWithFeatures);

        // 评估模型
        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setLabelCol("label")
                .setPredictionCol("prediction")
                .setMetricName("rmse");
        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

        // 输出模型的节点信息
        System.out.println("Learned regression GBT model:\n" + model.toDebugString());

        // 关闭 SparkSession
        spark.close();
    }
}

该程序与之前的程序类似,只是将 DecisionTreeRegressor 类替换为 GBTRegressor 类。需要注意的是,GBTRegressor 类在设置参数时需要设置 maxIter 参数,表示最大迭代次数。同样需要用 setFeaturesCol 方法设置特征列,用 setLabelCol 方法设置标签列。最后需要调用 fit 方法训练模型,然后用 transform 方法在测试集上进行预测。最后用 RegressionEvaluator 类评估模型,并输出模型的节点信息。


浏览 4
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报