RDD编程,熟悉算子,读写文件

DayNightStudy

共 10086字,需浏览 21分钟

 ·

2021-02-22 07:40

小编推荐

来源:子雨大数据

http://dblab.xmu.edu.cn/blog





内容:RDD编程,熟悉算子,读写文件


0 准备工作

1.启动 pyspark

from pyspark import SparkContextsc = SparkContext( 'local', 'test')

2.目录下创建 word.txt 和 person.json,内容分别为

面 第 一 行 首 先 从 外 部 文 件 d a t a . t x t 中 构 建 得 到 一 个 R D D ,名 称 为 l i n e s 


{"name":"Michael"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}


第3章 Spark编程基础

3.1  Spark入门:RDD编程
http://dblab.xmu.edu.cn/blog/1700-2/

(1) 从文件系统中加载数据创建RDD


textFile = sc.textFile("E:/pythonWp/sparkWP/wordCount/word.txt")wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)wordCount.collect()

运行效果:

[('This', 1), ('is', 1), .... ('and', 1), ('python', 1)]


(2) 通过并行集合(数组)创建RDD

nums = [1,2,3,4,5]rdd = sc.parallelize(nums)# 上面使用列表来创建。在Python中并没有数组这个基本数据类型,# 为了便于理解,你可以把列表当成其他语言的数组。

(3) RDD操作

RDD被创建好以后,在后续使用过程中一般会发生两种操作:

  • 转换(Transformation): 基于现有的数据集创建一个新的数据集。

  • 行动(Action):在数据集上进行运算,返回计算值。


1)  转换操作

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

下面列出一些常见的转换操作(Transformation API):

  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集

  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集

  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果

  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合


2)  行动操作

行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。 下面列出一些常见的行动操作(Action API):

  • count() 返回数据集中的元素个数

  • collect() 以数组的形式返回数据集中的所有元素

  • first() 返回数据集中的第一个元素

  • take(n) 以数组的形式返回数据集中的前n个元素

  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

  • foreach(func) 将数据集中的每个元素传递到函数func中运行*


(3) 惰性机制

这里给出一段简单的代码来解释Spark的惰性机制。

lines = sc.textFile("word.txt")print("lines.first():{0}".format(lines.first()))lineLengths = lines.map(lambda s : len(s))totalLength = lineLengths.reduce( lambda a, b : a + b)print("lineLengths:{0}".format(lineLengths))print("totalLength:{0}".format(totalLength))


上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。

第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。

第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。


(4) 持久化

前面我们已经说过,在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

比如,下面就是多次计算同一个DD的例子:

list = ["Hadoop","Spark","Hive"]rdd = sc.parallelize(list)print(rdd.count()) #行动操作,触发一次真正从头到尾的计算print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算

结果

3Hadoop,Spark,Hive

(5) 分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言: *本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N; *Apache Mesos:默认的分区数为8; *Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:


array = [1,2,3,4,5]rdd = sc.parallelize(array,2) #设置两个分区

对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。

如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。


3.2 Spark入门:键值对RDD
http://dblab.xmu.edu.cn/blog/1706-2/

(1)键值对RDD的创建

第一种创建方式:从文件中加载

lines = sc.textFile("word.txt")pairRDD = lines.flatMap(lambda line : line.split()).map(lambda word : (word,1))pairRDD.foreach(print)pairRDD.first()print (pairRDD.collect())

运行结果

[('面', 1), ('第', 1), ('一', 1), ('行', 1), ('首', 1), ('先', 1), ('从', 1), ('外', 1), ('部', 1), ('文', 1), ('件', 1), ('d', 1), ('a', 1), ('t', 1), ('a', 1), ('.', 1), ('t', 1), ('x', 1), ('t', 1), ('中', 1), ('构', 1), ('建', 1), ('得', 1), ('到', 1), ('一', 1), ('个', 1), ('R', 1), ('D', 1), ('D', 1), (',', 1), ('名', 1), ('称', 1), ('为', 1), ('l', 1), ('i', 1), ('n', 1), ('e', 1), ('s', 1)]


第二种创建方式:通过并行集合(列表)创建RDD

list = ["Hadoop","Spark","Hive","Spark"]rdd = sc.parallelize(list)airRDD = rdd.map(lambda word : (word,1))print (airRDD.collect())

运行结果

[('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('Spark', 1)]


(2) 常用的键值对转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我们通过实例来介绍。

1.reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。

我们对上面第二种方式创建得到的pairRDD进行reduceByKey()操作,代码如下:

airRDD = airRDD.reduceByKey(lambda a,b : a+b)print (airRDD.collect())

[('Hadoop', 1), ('Spark', 2), ('Hive', 1)]

2.groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。 我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:

groupRDD = pairRDD.groupByKey()print (groupRDD.collect())

3.keys()

keys()只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。

我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

keyRDD = pairRDD.keys()print (keyRDD.collect())

['面', '第', '一', '行', '首', '先', '从', '外', '部', '文', '件', 'd', 'a', 't', 'a', '.', 't', 'x', 't', '中', '构', '建', '得', '到', '一', '个', 'R', 'D', 'D', ',', '名', '称', '为', 'l', 'i', 'n', 'e', 's']

4.values()

values()只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是{1,2,3,5}。

我们对上面第二种方式创建得到的pairRDD进行values()操作,代码如下:

valuesRDD = pairRDD.values()print (valuesRDD.collect())
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

5.sortByKey()

sortByKey()的功能是返回一个根据键排序的RDD。

我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

sortByKeyRDD = pairRDD.sortByKey()print (sortByKeyRDD.collect())

[('.', 1), ('D', 1), ('D', 1), ('R', 1), ('a', 1), ('a', 1), ('d', 1), ('e', 1), ('i', 1), ('l', 1), ('n', 1), ('s', 1), ('t', 1), ('t', 1), ('t', 1), ('x', 1), ('一', 1), ('一', 1), ('个', 1), ('中', 1), ('为', 1), ('从', 1), ('件', 1), ('先', 1), ('到', 1), ('名', 1), ('外', 1), ('建', 1), ('得', 1), ('文', 1), ('构', 1), ('称', 1), ('第', 1), ('行', 1), ('部', 1), ('面', 1), ('首', 1), (',', 1)]

6.mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。

我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

mapValuesRDD = pairRDD.mapValues(lambda x : x+1)print (mapValuesRDD.collect())

[('面', 2), ('第', 2), ('一', 2), ('行', 2), ('首', 2), ('先', 2), ('从', 2), ('外', 2), ('部', 2), ('文', 2), ('件', 2), ('d', 2), ('a', 2), ('t', 2), ('a', 2), ('.', 2), ('t', 2), ('x', 2), ('t', 2), ('中', 2), ('构', 2), ('建', 2), ('得', 2), ('到', 2), ('一', 2), ('个', 2), ('R', 2), ('D', 2), ('D', 2), (',', 2), ('名', 2), ('称', 2), ('为', 2), ('l', 2), ('i', 2), ('n', 2), ('e', 2), ('s', 2)]

7.join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。

对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。对于这个实例,我们下面在pyspark中运行一下:

pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])pairRDD2 = sc.parallelize([('spark','fast')])pairRDD3 = pairRDD1.join(pairRDD2)print (pairRDD3.collect())

[('spark', (1, 'fast')), ('spark', (2, 'fast'))]


3.3 Spark入门:共享变量
http://dblab.xmu.edu.cn/blog/1707-2/

Spark中的两个重要抽象是RDD和共享变量。上一章我们已经介绍了RDD,这里介绍共享变量。

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候,需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。


广播变量

广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。

可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:

broadcastVar = sc.broadcast([1, 2, 3])broadcastVar.value
[1, 2, 3]

这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。

累加器

累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。

一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。

下面是一个代码实例,演示了使用累加器来对一个数组中的元素进行求和:

accum = sc.accumulator(0)sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))accum.value
10


3.4 数据读写
3.4.1 Spark入门:文件数据读写
http://dblab.xmu.edu.cn/blog/1708-2/

不同文件格式的文件系统的数据读写

下面分别介绍本地文件系统的数据读写和分布式文件系统HDFS的数据读写。


本地文件文件的数据读写

textFile = sc.textFile("word.txt")textFile.first()


'面 第 一 行 首 先 从 外 部 文 件 d a t a . t x t 中 构 建 得 到 一 个 R D D ,'


本地 JSON 的数据读写


jsonStr = sc.textFile("people.json")jsonStr.collect()

['{"name":"Michael"}', '{"name":"Andy", "age":30}', '{"name":"Justin", "age":19}']


从上面执行结果可以看出,people.json文件加载到RDD中以后,在RDD中存在三个字符串。我们下面要做的事情,就是把这三个JSON格式的字符串解析出来,比如说,第一个字符串{“name”:”Michael”},经过解析后,解析得到key是”name”,value是”Michael”。

现在我们编写程序完成对上面字符串的解析工作。

Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析。JSON.parseFull(jsonString:String)函数,以一个JSON字符串作为输入并进行解析,如果解析成功则返回一个Some(map: Map[String, Any]),如果解析失败则返回None。

因此,我们可以使用模式匹配来处理解析结果

请执行以下命令:


import json
inputFile = "people.json"jsonStrs = sc.textFile(inputFile)result = jsonStrs.map(lambda s : json.loads(s))result.collect()
[{'name': 'Michael'}, {'name': 'Andy', 'age': 30}, {'name': 'Justin', 'age': 19}]



浏览 61
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报