MLlib流设计,特征工程

DayNightStudy

共 15466字,需浏览 31分钟

 ·

2021-02-22 07:40

小编推荐

来源:子雨大数据

http://dblab.xmu.edu.cn/blog



【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!


内容:MLlib流设计,特征工程

第 6 节 Spark MLlib


6.1 Spark MLlib简介
http://dblab.xmu.edu.cn/blog/1723-2/

(1)、Spark 机器学习库MLLib


MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:

  1. 算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;

  2. 特征化工具:特征提取、转化、降维,和选择工具;

  3. 管道(Pipeline):用于构建、评估和调整机器学习管道的工具;

  4. 持久性:保存和加载算法,模型和管道;

  5. 实用工具:线性代数,统计,数据处理等工具。

Spark 机器学习库从 1.2 版本以后被分为两个包:

  • spark.mllib 包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。

  • spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。这种方式给我们提供了更灵活的方法,更符合机器学习过程的特点,也更容易从其他语言迁移。Spark官方推荐使用spark.ml。如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。开发者需要注意的是,从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。

Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:

6.2 机器学习工作流
http://dblab.xmu.edu.cn/blog/1763-2/

1、工作流(ML Pipelines)例子

(1)、引入 pyspark 库

Spark2.0以上版本的pyspark创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:

# 引入 pyspark 库from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()


from pyspark.ml import Pipelinefrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training documents from a list of (id, text, label) tuples.training = spark.createDataFrame([    (0, "a b c d e spark", 1.0),    (1, "b d", 0.0),    (2, "spark f g h", 1.0),    (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")lr = LogisticRegression(maxIter=10, regParam=0.001)

(2)、创建一个Pipeline

有了这些处理特定问题的转换器和评估器,接下来就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

(3)、产生一个PipelineModel

现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。

model = pipeline.fit(training)

(4)、构建测试数据

我们可以看到,model的类型是一个PipelineModel,这个管道模型将在测试数据的时候使用。所以接下来,我们先构建测试数据。

test = spark.createDataFrame([    (4, "spark i j k"),    (5, "l m n"),    (6, "spark hadoop spark"),    (7, "apache hadoop")], ["id", "text"])

(5)、生成我们所需要的预测结果

然后,我们调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果。

prediction = model.transform(test)selected = prediction.select("id", "text", "probability", "prediction")for row in selected.collect():    rid, text, prob, prediction = row    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

运行结果

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000
6.3 特征抽取、转化和选择
http://dblab.xmu.edu.cn/blog/1709-2/
6.3.1、TF-IDF (HashingTF and IDF)
http://dblab.xmu.edu.cn/blog/1766-2/


概念请看:

Datawhale-NLP实践——Task2.1-2.2 TF-IDF原理及实现
​https://mp.weixin.qq.com/s?__biz=MzAxMTU5Njg4NQ==&mid=100000463&idx=2&sn=bba5bd250758e02fb1b4e065a236f4d4

1)、引入 pyspark 库

Spark2.0以上版本的pyspark创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:

# 引入 pyspark 库from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()

2)、导入TFIDF所需要的包

首先,导入TFIDF所需要的包:

from pyspark.ml.feature import HashingTF,IDF,Tokenizer

3) 创建数据

准备工作完成后,我们创建一个简单的DataFrame,每一个句子代表一个文档。

sentenceData = spark.createDataFrame([(0, "I heard about Spark and I love Spark"),(0, "I wish Java could use case classes"),(1, "Logistic regression models are neat")]).toDF("label", "sentence")

4) 分词

在得到文档集合后,即可用tokenizer对句子进行分词

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")wordsData = tokenizer.transform(sentenceData)

5)、把句子哈希成特征向量

得到分词后的文档序列后,即可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000。

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)featurizedData = hashingTF.transform(wordsData)

可以看到,分词序列被变换成一个稀疏特征向量,其中每个单词都被散列成了一个不同的索引值,特征向量在某一维度上的值即该词汇在文档中出现的次数。

6)、训练模型

最后,使用IDF来对单纯的词频特征向量进行修正,使其更能体现不同词汇对文本的区别能力,IDF是一个Estimator,调用fit()方法并将词频向量传入,即产生一个IDFModel。

idf = IDF(inputCol="rawFeatures", outputCol="features")idfModel = idf.fit(featurizedData)

很显然,IDFModel是一个Transformer,调用它的transform()方法,即可得到每一个单词对应的TF-IDF度量值。


rescaledData = idfModel.transform(featurizedData)rescaledData.select("label", "features").show()
+-----+--------------------+|label|            features|+-----+--------------------+|    0|(20,[0,5,9,13,17]...||    0|(20,[2,7,9,13,15]...||    1|(20,[4,6,13,15,18...|+-----+--------------------+

可以看到,特征向量已经被其在语料库中出现的总次数进行了修正,通过TF-IDF得到的特征向量,在接下来可以被应用到相关的机器学习方法中。


6.3.2、特征抽取:Word2Vec
http://dblab.xmu.edu.cn/blog/1766-2/

概念请看:

 Task 6.1. 文本表示:从one-hot到word2vec
https://mp.weixin.qq.com/s?__biz=MzAxMTU5Njg4NQ==&mid=100000893&idx=2&sn=3eed143ba5dd759dc61a496acf2f9496
# 引入 pyspark 库from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()from pyspark.ml.feature import Word2Vec
documentDF = spark.createDataFrame([    ("Hi I heard about Spark".split(" "), ),    ("I wish Java could use case classes".split(" "), ),    ("Logistic regression models are neat".split(" "), )], ["text"])
3)、模型构建

新建一个Word2Vec,显然,它是一个Estimator,设置相应的超参数,这里设置特征向量的维度为3,Word2Vec模型还有其他可设置的超参数,具体的超参数描述可以参见这里。


word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
4)、读入训练数据,用fit()方法生成一个Word2VecModel


model = word2Vec.fit(documentDF)
5)、利用Word2VecModel把文档转变成特征向量
result = model.transform(documentDF)for row in result.collect():    text, vector = row    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
Text: [Hi, I, heard, about, Spark] => Vector: [-0.02481612414121628,0.030189220979809764,-0.043382836133241655]
Text: [I, wish, Java, could, use, case, classes] => Vector: [-0.015328743628093174,0.01850887880261455,0.041017367687475464]
Text: [Logistic, regression, models, are, neat] => Vector: [-0.020753448456525804,-0.046902849525213244,0.031090765446424487]


(3)、特征抽取–CountVectorizer
http://dblab.xmu.edu.cn/blog/1769-2/
1)、 首先,导入CountVectorizer所需要的包


from pyspark.ml.feature import CountVectorizer
2)、假设我们有如下的DataFrame,其包含id和words两列,可以看成是一个包含两个文档的迷你语料库
df = spark.createDataFrame([    (0, "a b c".split(" ")),    (1, "a b b c a".split(" "))], ["id", "words"])
3)、随后,通过CountVectorizer设定超参数,训练一个CountVectorizer,这里设定词汇表的最大量为3,设定词汇表中的词至少要在2个文档中出现过,以过滤那些偶然出现的词汇
# fit a CountVectorizerModel from the corpus.cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
4)、在训练结束后,可以通过cv对DataFrame进行fit,获得到模型的词汇表
model = cv.fit(df)# 使用这一模型对DataFrame进行变换,可以得到文档的向量化表示
result = model.transform(df)result.show(truncate=False)
+---+---------------+-------------------------+|id |words          |features                 |+---+---------------+-------------------------+|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])||1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|+---+---------------+-------------------------+
for row in result.collect():    id,text, vector = row    print("Id:{%s}=> \nText: [%s] => \nVector: %s\n" % (str(id),", ".join(text), str(vector)))
Id:{0}=> Text: [a, b, c] => Vector: (3,[0,1,2],[1.0,1.0,1.0])
Id:{1}=> Text: [a, b, b, c, a] => Vector: (3,[0,1,2],[2.0,2.0,1.0])


(3)、特征变换–标签和索引的转化(Python版)
http://dblab.xmu.edu.cn/blog/1770-2/

Spark ML包中提供了几个相关的转换器,例如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。

值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于ML Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例,其存储着进行标签数值化过程的相关 超参数,是一个Estimator,对其调用fit(..)方法即可生成相应的模型StringIndexerModel类,很显然,它存储了用于DataFrame进行相关处理的 参数,是一个Transformer(其他转换器也是同一原理) 下面对几个常用的转换器依次进行介绍。


1)、StringIndexer


StringIndexer转换器可以把一列类别型的特征(或标签)进行编码,使其数值化,索引的范围从0开始,该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

索引构建的顺序为标签的频率,优先编码频率较大的标签,所以出现频率最高的标签为0号。 如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码。

首先,引入必要的包,并创建一个简单的DataFrame,它只包含一个id列和一个标签列category:


from pyspark.ml.feature import StringIndexer df = spark.createDataFrame(    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],    ["id", "category"])

随后,我们创建一个StringIndexer对象,设定输入输出列名,其余参数采用默认值,并对这个DataFrame进行训练,产生StringIndexerModel对象:

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")model = indexer.fit(df)

随后即可利用该对象对DataFrame进行转换操作,可以看到,StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,即出现最多的“a”被编号成0,“c”为1,出现最少的“b”为0。

+---+--------+-------------+| id|category|categoryIndex|+---+--------+-------------+|  0|       a|          0.0||  1|       b|          2.0||  2|       c|          1.0||  3|       a|          0.0||  4|       a|          0.0||  5|       c|          1.0|+---+--------+-------------+
2)、IndexToString

与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。

其主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。

首先,和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,然后输出到“categoryIndex”列上,构建出新的DataFrame。

from pyspark.ml.feature import IndexToString, StringIndexer df = spark.createDataFrame(    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],    ["id", "category"]) indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")model = indexer.fit(df)indexed = model.transform(df)

随后,创建IndexToString对象,读取“categoryIndex”上的标签索引,获得原有数据集的字符型标签,然后再输出到“originalCategory”列上。最后,通过输出“originalCategory”列,可以看到数据集中原有的字符标签。

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")converted = converter.transform(indexed)converted.select("id", "categoryIndex", "originalCategory").show()
+---+-------------+----------------+| id|categoryIndex|originalCategory|+---+-------------+----------------+|  0|          0.0|               a||  1|          2.0|               b||  2|          1.0|               c||  3|          0.0|               a||  4|          0.0|               a||  5|          1.0|               c|+---+-------------+----------------+
3)、OneHotEncoder

独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。

One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑斯蒂回归等。

首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:


from pyspark.ml.feature import OneHotEncoder, StringIndexer df = spark.createDataFrame([    (0, "a"),    (1, "b"),    (2, "c"),    (3, "a"),    (4, "a"),    (5, "c")], ["id", "category"]) stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")model = stringIndexer.fit(df)indexed = model.transform(df)


随后,我们创建OneHotEncoder对象对处理后的DataFrame进行编码,可以看见,编码后的二进制特征呈稀疏向量形式,与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")encoded = encoder.transform(indexed)encoded.show()

+---+--------+-------------+-------------+| id|category|categoryIndex|  categoryVec|+---+--------+-------------+-------------+|  0|        a|          0.0|(2,[0],[1.0])||  1|        b|          2.0|    (2,[],[])||  2|        c|          1.0|(2,[1],[1.0])||  3|        a|          0.0|(2,[0],[1.0])||  4|        a|          0.0|(2,[0],[1.0])||  5|        c|          1.0|(2,[1],[1.0])|+---+--------+-------------+-------------+
4)、VectorIndexer

之前介绍的StringIndexer是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了VectorIndexer类来解决向量数据集中的类别性特征转换。

通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

在下面的例子中,我们读入一个数据集,然后使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置maxCategories为10,即只有种类小10的特征才被认为是类别型特征,否则被认为是连续型特征:

from pyspark.ml.feature import VectorIndexerdata = spark.read.format('libsvm').load('../resources/sample_libsvm_data.txt')indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)indexerModel = indexer.fit(data)categoricalFeatures = indexerModel.categoryMapsindexedData = indexerModel.transform(data)indexedData.show()

+-----+--------------------+--------------------+|label|            features|             indexed|+-----+--------------------+--------------------+|  0.0|(692,[127,128,129...|(692,[127,128,129...||  1.0|(692,[158,159,160...|(692,[158,159,160...||  1.0|(692,[124,125,126...|(692,[124,125,126...||  1.0|(692,[152,153,154...|(692,[152,153,154...||  1.0|(692,[151,152,153...|(692,[151,152,153...||  0.0|(692,[129,130,131...|(692,[129,130,131...||  1.0|(692,[158,159,160...|(692,[158,159,160...||  1.0|(692,[99,100,101,...|(692,[99,100,101,...||  0.0|(692,[154,155,156...|(692,[154,155,156...||  0.0|(692,[127,128,129...|(692,[127,128,129...||  1.0|(692,[154,155,156...|(692,[154,155,156...||  0.0|(692,[153,154,155...|(692,[153,154,155...||  0.0|(692,[151,152,153...|(692,[151,152,153...||  1.0|(692,[129,130,131...|(692,[129,130,131...||  0.0|(692,[154,155,156...|(692,[154,155,156...||  1.0|(692,[150,151,152...|(692,[150,151,152...||  0.0|(692,[124,125,126...|(692,[124,125,126...||  0.0|(692,[152,153,154...|(692,[152,153,154...||  1.0|(692,[97,98,99,12...|(692,[97,98,99,12...||  1.0|(692,[124,125,126...|(692,[124,125,126...|+-----+--------------------+--------------------+


(4)、特征选取–卡方选择器(Python版)
http://dblab.xmu.edu.cn/blog/1771-2/

特征选择(Feature Selection)指的是在特征向量中选择出那些“优秀”的特征,组成新的、更“精简”的特征向量的过程。它在高维数据分析中十分常用,可以剔除掉“冗余”和“无关”的特征,提升学习器的性能。

特征选择方法和分类方法一样,也主要分为有监督(Supervised)和无监督(Unsupervised)两种,卡方选择则是统计学上常用的一种有监督特征选择方法,它通过对特征和真实标签之间进行卡方检验,来判断该特征和真实标签的关联程度,进而确定是否对其进行选择。

和ML库中的大多数学习方法一样,ML中的卡方选择也是以estimator+transformer的形式出现的,其主要由ChiSqSelector和ChiSqSelectorModel两个类来实现。

在进行实验前,首先进行环境的设置。引入卡方选择器所需要使用的类:

from pyspark.ml.feature import ChiSqSelectorfrom pyspark.ml.linalg import Vectors# 默认名为spark的SparkSession已经创建。

df = spark.createDataFrame([    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)result.show()
+---+------------------+-------+----------------+| id|          features|clicked|selectedFeatures|+---+------------------+-------+----------------+|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]||  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]||  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|+---+------------------+-------+----------------+

for row in result.collect():    id, features, clicked,selectedFeatures  = row    features = list(map(str,features))    print("Id:{%s} => \nFeatures: [%s] => \n Clicked: %s => \n SelectedFeatures: %s\n" % (str(id),", ".join(features), str(clicked), str(selectedFeatures[0])))
Id:{7} => Features: [0.0, 0.0, 18.0, 1.0] =>  Clicked: 1.0 =>  SelectedFeatures: 18.0
Id:{8} => Features: [0.0, 1.0, 12.0, 0.0] => Clicked: 0.0 => SelectedFeatures: 12.0
Id:{9} => Features: [1.0, 0.0, 15.0, 0.1] => Clicked: 0.0 => SelectedFeatures: 15.0


浏览 28
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报