MLlib流设计,特征工程
共 15466字,需浏览 31分钟
·
2021-02-22 07:40
小编推荐
来源:子雨大数据
http://dblab.xmu.edu.cn/blog
【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
内容:MLlib流设计,特征工程
第 6 节 Spark MLlib
6.1 Spark MLlib简介 http://dblab.xmu.edu.cn/blog/1723-2/
(1)、Spark 机器学习库MLLib
MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:
算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;
特征化工具:特征提取、转化、降维,和选择工具;
管道(Pipeline):用于构建、评估和调整机器学习管道的工具;
持久性:保存和加载算法,模型和管道;
实用工具:线性代数,统计,数据处理等工具。
Spark 机器学习库从 1.2 版本以后被分为两个包:
spark.mllib 包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。
spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。
使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。这种方式给我们提供了更灵活的方法,更符合机器学习过程的特点,也更容易从其他语言迁移。Spark官方推荐使用spark.ml。如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。开发者需要注意的是,从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。
Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:
6.2 机器学习工作流 http://dblab.xmu.edu.cn/blog/1763-2/
1、工作流(ML Pipelines)例子
(1)、引入 pyspark 库
Spark2.0以上版本的pyspark创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:
# 引入 pyspark 库
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
(2)、创建一个Pipeline
有了这些处理特定问题的转换器和评估器,接下来就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
(3)、产生一个PipelineModel
现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。
model = pipeline.fit(training)
(4)、构建测试数据
我们可以看到,model的类型是一个PipelineModel,这个管道模型将在测试数据的时候使用。所以接下来,我们先构建测试数据。
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
(5)、生成我们所需要的预测结果
然后,我们调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果。
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
运行结果
(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000
6.3 特征抽取、转化和选择 http://dblab.xmu.edu.cn/blog/1709-2/
6.3.1、TF-IDF (HashingTF and IDF) http://dblab.xmu.edu.cn/blog/1766-2/
概念请看:
Datawhale-NLP实践——Task2.1-2.2 TF-IDF原理及实现 https://mp.weixin.qq.com/s?__biz=MzAxMTU5Njg4NQ==&mid=100000463&idx=2&sn=bba5bd250758e02fb1b4e065a236f4d4
1)、引入 pyspark 库
Spark2.0以上版本的pyspark创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:
# 引入 pyspark 库
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
2)、导入TFIDF所需要的包
首先,导入TFIDF所需要的包:
from pyspark.ml.feature import HashingTF,IDF,Tokenizer
3) 创建数据
准备工作完成后,我们创建一个简单的DataFrame,每一个句子代表一个文档。
sentenceData = spark.createDataFrame([(0, "I heard about Spark and I love Spark"),(0, "I wish Java could use case classes"),(1, "Logistic regression models are neat")]).toDF("label", "sentence")
4) 分词
在得到文档集合后,即可用tokenizer对句子进行分词
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
5)、把句子哈希成特征向量
得到分词后的文档序列后,即可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000。
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
可以看到,分词序列被变换成一个稀疏特征向量,其中每个单词都被散列成了一个不同的索引值,特征向量在某一维度上的值即该词汇在文档中出现的次数。
6)、训练模型
最后,使用IDF来对单纯的词频特征向量进行修正,使其更能体现不同词汇对文本的区别能力,IDF是一个Estimator,调用fit()方法并将词频向量传入,即产生一个IDFModel。
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
很显然,IDFModel是一个Transformer,调用它的transform()方法,即可得到每一个单词对应的TF-IDF度量值。
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(20,[0,5,9,13,17]...|
| 0|(20,[2,7,9,13,15]...|
| 1|(20,[4,6,13,15,18...|
+-----+--------------------+
可以看到,特征向量已经被其在语料库中出现的总次数进行了修正,通过TF-IDF得到的特征向量,在接下来可以被应用到相关的机器学习方法中。
6.3.2、特征抽取:Word2Vec http://dblab.xmu.edu.cn/blog/1766-2/
概念请看:
Task 6.1. 文本表示:从one-hot到word2vec https://mp.weixin.qq.com/s?__biz=MzAxMTU5Njg4NQ==&mid=100000893&idx=2&sn=3eed143ba5dd759dc61a496acf2f9496
# 引入 pyspark 库
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
from pyspark.ml.feature import Word2Vec
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
3)、模型构建
新建一个Word2Vec,显然,它是一个Estimator,设置相应的超参数,这里设置特征向量的维度为3,Word2Vec模型还有其他可设置的超参数,具体的超参数描述可以参见这里。
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
4)、读入训练数据,用fit()方法生成一个Word2VecModel
model = word2Vec.fit(documentDF)
5)、利用Word2VecModel把文档转变成特征向量
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
Text: [Hi, I, heard, about, Spark] =>
Vector: [-0.02481612414121628,0.030189220979809764,-0.043382836133241655]
Text: [I, wish, Java, could, use, case, classes] =>
Vector: [-0.015328743628093174,0.01850887880261455,0.041017367687475464]
Text: [Logistic, regression, models, are, neat] =>
Vector: [-0.020753448456525804,-0.046902849525213244,0.031090765446424487]
(3)、特征抽取–CountVectorizer http://dblab.xmu.edu.cn/blog/1769-2/
1)、 首先,导入CountVectorizer所需要的包
from pyspark.ml.feature import CountVectorizer
2)、假设我们有如下的DataFrame,其包含id和words两列,可以看成是一个包含两个文档的迷你语料库
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
3)、随后,通过CountVectorizer设定超参数,训练一个CountVectorizer,这里设定词汇表的最大量为3,设定词汇表中的词至少要在2个文档中出现过,以过滤那些偶然出现的词汇
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
4)、在训练结束后,可以通过cv对DataFrame进行fit,获得到模型的词汇表
model = cv.fit(df)
# 使用这一模型对DataFrame进行变换,可以得到文档的向量化表示
result = model.transform(df)
result.show(truncate=False)
+---+---------------+-------------------------+
|id |words |features |
+---+---------------+-------------------------+
|0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+
for row in result.collect():
id,text, vector = row
print("Id:{%s}=> \nText: [%s] => \nVector: %s\n" % (str(id),", ".join(text), str(vector)))
Id:{0}=>
Text: [a, b, c] =>
Vector: (3,[0,1,2],[1.0,1.0,1.0])
Id:{1}=>
Text: [a, b, b, c, a] =>
Vector: (3,[0,1,2],[2.0,2.0,1.0])
(3)、特征变换–标签和索引的转化(Python版) http://dblab.xmu.edu.cn/blog/1770-2/
Spark ML包中提供了几个相关的转换器,例如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。
值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于ML Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例,其存储着进行标签数值化过程的相关 超参数,是一个Estimator,对其调用fit(..)方法即可生成相应的模型StringIndexerModel类,很显然,它存储了用于DataFrame进行相关处理的 参数,是一个Transformer(其他转换器也是同一原理) 下面对几个常用的转换器依次进行介绍。
1)、StringIndexer
StringIndexer转换器可以把一列类别型的特征(或标签)进行编码,使其数值化,索引的范围从0开始,该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。
索引构建的顺序为标签的频率,优先编码频率较大的标签,所以出现频率最高的标签为0号。 如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码。
首先,引入必要的包,并创建一个简单的DataFrame,它只包含一个id列和一个标签列category:
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[ ],
[ ])
随后,我们创建一个StringIndexer对象,设定输入输出列名,其余参数采用默认值,并对这个DataFrame进行训练,产生StringIndexerModel对象:
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
随后即可利用该对象对DataFrame进行转换操作,可以看到,StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,即出现最多的“a”被编号成0,“c”为1,出现最少的“b”为0。
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+-------------+
2)、IndexToString
与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。
其主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。
首先,和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,然后输出到“categoryIndex”列上,构建出新的DataFrame。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
随后,创建IndexToString对象,读取“categoryIndex”上的标签索引,获得原有数据集的字符型标签,然后再输出到“originalCategory”列上。最后,通过输出“originalCategory”列,可以看到数据集中原有的字符标签。
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
converted.select("id", "categoryIndex", "originalCategory").show()
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
| 0| 0.0| a|
| 1| 2.0| b|
| 2| 1.0| c|
| 3| 0.0| a|
| 4| 0.0| a|
| 5| 1.0| c|
+---+-------------+----------------+
3)、OneHotEncoder
独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。
One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑斯蒂回归等。
首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
随后,我们创建OneHotEncoder对象对处理后的DataFrame进行编码,可以看见,编码后的二进制特征呈稀疏向量形式,与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
+---+--------+-------------+-------------+
| id|category|categoryIndex| categoryVec|
+---+--------+-------------+-------------+
| 0| a| 0.0|(2,[0],[1.0])|
| 1| b| 2.0| (2,[],[])|
| 2| c| 1.0|(2,[1],[1.0])|
| 3| a| 0.0|(2,[0],[1.0])|
| 4| a| 0.0|(2,[0],[1.0])|
| 5| c| 1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+
4)、VectorIndexer
之前介绍的StringIndexer是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了VectorIndexer类来解决向量数据集中的类别性特征转换。
通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。
在下面的例子中,我们读入一个数据集,然后使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置maxCategories为10,即只有种类小10的特征才被认为是类别型特征,否则被认为是连续型特征:
from pyspark.ml.feature import VectorIndexer
data = spark.read.format('libsvm').load('../resources/sample_libsvm_data.txt')
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
indexedData = indexerModel.transform(data)
indexedData.show()
+-----+--------------------+--------------------+
|label| features| indexed|
+-----+--------------------+--------------------+
| 0.0|(692,[127,128,129...|(692,[127,128,129...|
| 1.0|(692,[158,159,160...|(692,[158,159,160...|
| 1.0|(692,[124,125,126...|(692,[124,125,126...|
| 1.0|(692,[152,153,154...|(692,[152,153,154...|
| 1.0|(692,[151,152,153...|(692,[151,152,153...|
| 0.0|(692,[129,130,131...|(692,[129,130,131...|
| 1.0|(692,[158,159,160...|(692,[158,159,160...|
| 1.0|(692,[99,100,101,...|(692,[99,100,101,...|
| 0.0|(692,[154,155,156...|(692,[154,155,156...|
| 0.0|(692,[127,128,129...|(692,[127,128,129...|
| 1.0|(692,[154,155,156...|(692,[154,155,156...|
| 0.0|(692,[153,154,155...|(692,[153,154,155...|
| 0.0|(692,[151,152,153...|(692,[151,152,153...|
| 1.0|(692,[129,130,131...|(692,[129,130,131...|
| 0.0|(692,[154,155,156...|(692,[154,155,156...|
| 1.0|(692,[150,151,152...|(692,[150,151,152...|
| 0.0|(692,[124,125,126...|(692,[124,125,126...|
| 0.0|(692,[152,153,154...|(692,[152,153,154...|
| 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...|
| 1.0|(692,[124,125,126...|(692,[124,125,126...|
+-----+--------------------+--------------------+
(4)、特征选取–卡方选择器(Python版) http://dblab.xmu.edu.cn/blog/1771-2/
特征选择(Feature Selection)指的是在特征向量中选择出那些“优秀”的特征,组成新的、更“精简”的特征向量的过程。它在高维数据分析中十分常用,可以剔除掉“冗余”和“无关”的特征,提升学习器的性能。
特征选择方法和分类方法一样,也主要分为有监督(Supervised)和无监督(Unsupervised)两种,卡方选择则是统计学上常用的一种有监督特征选择方法,它通过对特征和真实标签之间进行卡方检验,来判断该特征和真实标签的关联程度,进而确定是否对其进行选择。
和ML库中的大多数学习方法一样,ML中的卡方选择也是以estimator+transformer的形式出现的,其主要由ChiSqSelector和ChiSqSelectorModel两个类来实现。
在进行实验前,首先进行环境的设置。引入卡方选择器所需要使用的类:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# 默认名为spark的SparkSession已经创建。
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
result.show()
+---+------------------+-------+----------------+
| id| features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
| 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]|
| 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]|
| 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]|
+---+------------------+-------+----------------+
for row in result.collect():
id, features, clicked,selectedFeatures = row
features = list(map(str,features))
print("Id:{%s} => \nFeatures: [%s] => \n Clicked: %s => \n SelectedFeatures: %s\n" % (str(id),", ".join(features), str(clicked), str(selectedFeatures[0])))
Id:{7} =>
Features: [0.0, 0.0, 18.0, 1.0] =>
Clicked: 1.0 =>
SelectedFeatures: 18.0
Id:{8} =>
Features: [0.0, 1.0, 12.0, 0.0] =>
Clicked: 0.0 =>
SelectedFeatures: 12.0
Id:{9} =>
Features: [1.0, 0.0, 15.0, 0.1] =>
Clicked: 0.0 =>
SelectedFeatures: 15.0