RDD编程,熟悉算子,读写文件
小编推荐
来源:子雨大数据
http://dblab.xmu.edu.cn/blog
内容:RDD编程,熟悉算子,读写文件
0 准备工作
1.启动 pyspark
from pyspark import SparkContext
sc = SparkContext( 'local', 'test')
2.目录下创建 word.txt 和 person.json,内容分别为
面 第 一 行 首 先 从 外 部 文 件 d a t a . t x t 中 构 建 得 到 一 个 R D D ,
名 称 为 l i n e s
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
第3章 Spark编程基础
3.1 Spark入门:RDD编程 http://dblab.xmu.edu.cn/blog/1700-2/
(1) 从文件系统中加载数据创建RDD
textFile = sc.textFile("E:/pythonWp/sparkWP/wordCount/word.txt")
wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
wordCount.collect()
运行效果:
[('This', 1),
('is', 1),
....
('and', 1),
('python', 1)]
(2) 通过并行集合(数组)创建RDD
nums = [1,2,3,4,5]
rdd = sc.parallelize(nums)
# 上面使用列表来创建。在Python中并没有数组这个基本数据类型,
# 为了便于理解,你可以把列表当成其他语言的数组。
(3) RDD操作
RDD被创建好以后,在后续使用过程中一般会发生两种操作:
转换(Transformation): 基于现有的数据集创建一个新的数据集。
行动(Action):在数据集上进行运算,返回计算值。
1) 转换操作
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):
filter(func):筛选出满足函数func的元素,并返回一个新的数据集
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
2) 行动操作
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。 下面列出一些常见的行动操作(Action API):
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行*
(3) 惰性机制
这里给出一段简单的代码来解释Spark的惰性机制。
lines = sc.textFile("word.txt")
print("lines.first():{0}".format(lines.first()))
lineLengths = lines.map(lambda s : len(s))
totalLength = lineLengths.reduce( lambda a, b : a + b)
print("lineLengths:{0}".format(lineLengths))
print("totalLength:{0}".format(totalLength))
上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。
第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。
第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。
(4) 持久化
前面我们已经说过,在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
比如,下面就是多次计算同一个DD的例子:
list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
结果
3
Hadoop,Spark,Hive
(5) 分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言: *本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N; *Apache Mesos:默认的分区数为8; *Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;
因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:
array = [1,2,3,4,5]
rdd = sc.parallelize(array,2) #设置两个分区
对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。
3.2 Spark入门:键值对RDD http://dblab.xmu.edu.cn/blog/1706-2/
(1)键值对RDD的创建
第一种创建方式:从文件中加载
lines = sc.textFile("word.txt")
pairRDD = lines.flatMap(lambda line : line.split()).map(lambda word : (word,1))
pairRDD.foreach(print)
pairRDD.first()
print (pairRDD.collect())
运行结果
[('面', 1), ('第', 1), ('一', 1), ('行', 1), ('首', 1), ('先', 1), ('从', 1), ('外', 1), ('部', 1), ('文', 1), ('件', 1), ('d', 1), ('a', 1), ('t', 1), ('a', 1), ('.', 1), ('t', 1), ('x', 1), ('t', 1), ('中', 1), ('构', 1), ('建', 1), ('得', 1), ('到', 1), ('一', 1), ('个', 1), ('R', 1), ('D', 1), ('D', 1), (',', 1), ('名', 1), ('称', 1), ('为', 1), ('l', 1), ('i', 1), ('n', 1), ('e', 1), ('s', 1)]
第二种创建方式:通过并行集合(列表)创建RDD
list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
airRDD = rdd.map(lambda word : (word,1))
print (airRDD.collect())
运行结果
[('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('Spark', 1)]
(2) 常用的键值对转换操作
常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我们通过实例来介绍。
1.reduceByKey(func)
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。
我们对上面第二种方式创建得到的pairRDD进行reduceByKey()操作,代码如下:
airRDD = airRDD.reduceByKey(lambda a,b : a+b)
print (airRDD.collect())
[('Hadoop', 1), ('Spark', 2), ('Hive', 1)]
2.groupByKey()
groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。 我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:
groupRDD = pairRDD.groupByKey()
print (groupRDD.collect())
3.keys()
keys()只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
keyRDD = pairRDD.keys()
print (keyRDD.collect())
['面', '第', '一', '行', '首', '先', '从', '外', '部', '文', '件', 'd', 'a', 't', 'a', '.', 't', 'x', 't', '中', '构', '建', '得', '到', '一', '个', 'R', 'D', 'D', ',', '名', '称', '为', 'l', 'i', 'n', 'e', 's']
4.values()
values()只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是{1,2,3,5}。
我们对上面第二种方式创建得到的pairRDD进行values()操作,代码如下:
valuesRDD = pairRDD.values()
print (valuesRDD.collect())
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
5.sortByKey()
sortByKey()的功能是返回一个根据键排序的RDD。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
sortByKeyRDD = pairRDD.sortByKey()
print (sortByKeyRDD.collect())
[('.', 1), ('D', 1), ('D', 1), ('R', 1), ('a', 1), ('a', 1), ('d', 1), ('e', 1), ('i', 1), ('l', 1), ('n', 1), ('s', 1), ('t', 1), ('t', 1), ('t', 1), ('x', 1), ('一', 1), ('一', 1), ('个', 1), ('中', 1), ('为', 1), ('从', 1), ('件', 1), ('先', 1), ('到', 1), ('名', 1), ('外', 1), ('建', 1), ('得', 1), ('文', 1), ('构', 1), ('称', 1), ('第', 1), ('行', 1), ('部', 1), ('面', 1), ('首', 1), (',', 1)]
6.mapValues(func)
我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
mapValuesRDD = pairRDD.mapValues(lambda x : x+1)
print (mapValuesRDD.collect())
[('面', 2), ('第', 2), ('一', 2), ('行', 2), ('首', 2), ('先', 2), ('从', 2), ('外', 2), ('部', 2), ('文', 2), ('件', 2), ('d', 2), ('a', 2), ('t', 2), ('a', 2), ('.', 2), ('t', 2), ('x', 2), ('t', 2), ('中', 2), ('构', 2), ('建', 2), ('得', 2), ('到', 2), ('一', 2), ('个', 2), ('R', 2), ('D', 2), ('D', 2), (',', 2), ('名', 2), ('称', 2), ('为', 2), ('l', 2), ('i', 2), ('n', 2), ('e', 2), ('s', 2)]
7.join
join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
比如,pairRDD1是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。对于这个实例,我们下面在pyspark中运行一下:
pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])
pairRDD2 = sc.parallelize([('spark','fast')])
pairRDD3 = pairRDD1.join(pairRDD2)
print (pairRDD3.collect())
[('spark', (1, 'fast')), ('spark', (2, 'fast'))]
3.3 Spark入门:共享变量 http://dblab.xmu.edu.cn/blog/1707-2/
Spark中的两个重要抽象是RDD和共享变量。上一章我们已经介绍了RDD,这里介绍共享变量。
在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候,需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。
广播变量
广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value
[1, 2, 3]
这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。
累加器
累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。
一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。
下面是一个代码实例,演示了使用累加器来对一个数组中的元素进行求和:
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))
accum.value
10
3.4 数据读写
3.4.1 Spark入门:文件数据读写http://dblab.xmu.edu.cn/blog/1708-2/
不同文件格式的文件系统的数据读写
下面分别介绍本地文件系统的数据读写和分布式文件系统HDFS的数据读写。
本地文件文件的数据读写
textFile = sc.textFile("word.txt")
textFile.first()
'面 第 一 行 首 先 从 外 部 文 件 d a t a . t x t 中 构 建 得 到 一 个 R D D ,'
本地 JSON 的数据读写
jsonStr = sc.textFile("people.json")
jsonStr.collect()
['{"name":"Michael"}',
'{"name":"Andy", "age":30}',
'{"name":"Justin", "age":19}']
从上面执行结果可以看出,people.json文件加载到RDD中以后,在RDD中存在三个字符串。我们下面要做的事情,就是把这三个JSON格式的字符串解析出来,比如说,第一个字符串{“name”:”Michael”},经过解析后,解析得到key是”name”,value是”Michael”。
现在我们编写程序完成对上面字符串的解析工作。
Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析。JSON.parseFull(jsonString:String)函数,以一个JSON字符串作为输入并进行解析,如果解析成功则返回一个Some(map: Map[String, Any]),如果解析失败则返回None。
因此,我们可以使用模式匹配来处理解析结果
请执行以下命令:
import json
inputFile = "people.json"
jsonStrs = sc.textFile(inputFile)
result = jsonStrs.map(lambda s : json.loads(s))
result.collect()
[{'name': 'Michael'},
{'name': 'Andy', 'age': 30},
{'name': 'Justin', 'age': 19}]