Spark Shuffle过程详解
对比 Hadoop MapReduce 和 Spark 的 Shuffle 过程
Shuffle write
spark.shuffle.file.buffer.kb
,默认是 32KB(Spark 1.1 版本以前是 100KB)。其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。
partitioner.partition(record.getKey()))
决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。
缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了
cores * R * 32 KB
。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。
spark.shuffle.consolidateFiles=true
来开启。Shuffle read
在什么时候 fetch,parent stage 中的一个 ShuffleMapTask 执行完还是等全部 ShuffleMapTasks 执行完?
边 fetch 边处理还是一次性 fetch 完再处理?
fetch 来的数据存放到哪里?
怎么获得要 fetch 的数据的存放位置?
在什么时候 fetch?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。因为 fetch 来的 FileSegments 要先在内存做缓冲,所以一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过
spark.reducer.maxMbInFlight
,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面一般包含多个 FileSegment,但如果某个 FileSegment 特别大的话,这一个就可以填满甚至超过 softBuffer 的界限。边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?答案是使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 \
record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value)
,比如上面 WordCount 例子中的 func 就是hashMap.get(Key) + Value
,并将 func 的结果重新 put(key) 到 HashMap 中去。这个 func 功能上相当于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差别,差别相当于下面两段程序的差别。// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接
sum(values)/values.length
,而在 Spark 中 func 可以实现sum(values)
,但不好实现/values.length
。更多的 func 将会在下面的章节细致分析。fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果
spark.shuffle.spill = false
就只用内存。内存使用的是AppendOnlyMap
,类似 Java 的HashMap
,内存+磁盘使用的是ExternalAppendOnlyMap
,如果内存空间不足时,ExternalAppendOnlyMap
可以将 \records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。 怎么获得要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,我们强调 “一个 ShuffleMapStage 形成后,会将该 stage 最后一个 final RDD 注册到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
,这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。因此,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。
典型 transformation() 的 shuffle read
1. reduceByKey(func)
map 端的区别:map() 没有区别。对于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上进行 combine()。
reduce 端区别:MapReduce 的 shuffle 阶段先 fetch 数据,数据量到达一定规模后 combine(),再将剩余数据 merge-sort 后 reduce(),reduce() 非常灵活。Spark 边 fetch 边 reduce()(在 HashMap 上执行 func),因此要求 func 符合 commulative 的特性。
map 端区别:MapReduce 需要开一个大型环形缓冲区来暂存和排序 map() 的部分输出结果,但 combine() 不需要额外空间(除非用户自己定义)。Spark 需要 HashMap 内存数据结构来进行 combine(),同时输出 records 到磁盘上时也需要一个小的 buffer(bucket)。
reduce 端区别:MapReduce 需要一部分内存空间来存储 shuffle 过来的数据,combine() 和 reduce() 不需要额外空间,因为它们的输入数据分段有序,只需归并一下就可以得到。在 Spark 中,fetch 时需要 softBuffer,处理数据时如果只使用内存,那么需要 HashMap 来持有处理后的结果。如果使用内存+磁盘,那么在 HashMap 存放一部分处理后的数据。
2. groupByKey(numPartitions)
result = result ++ record.value
,功能是将每个 key 对应的所有 values 链接在一起。result 来自 hashMap.get(record.key),计算后的 result 会再次被 put 到 hashMap 中。与 reduceByKey() 的区别就是 groupByKey() 没有 map 端的 combine()。对于 groupByKey() 来说 map 端的 combine() 只是减少了重复 Key 占用的空间,如果 key 重复率不高,没必要 combine(),否则,最好能够 combine()。3. distinct(numPartitions)
result = result == null? record.value : result
,如果 HashMap 中没有该 record 就将其放入,否则舍弃。与 reduceByKey() 相同,在map 端存在 combine()。4. cogroup(otherRDD, numPartitions)
5. intersection(otherRDD) 和 join(otherRDD, numPartitions)
6. sortByKey(ascending, numPartitions)
7. coalesce(numPartitions, shuffle = true)
Shuffle read 中的 HashMap
1. AppendOnlyMap
remove(key)
方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。如下图:destructiveSortedIterator(): Iterator[(K, V)]
方法,可以返回 Array 中排序后的 (K, V) pairs。实现方法很简单:先将所有 (K, V) pairs compact 到 Array 的前端,并使得每个 (K, V) 占一个位置(原来占两个),之后直接调用 Array.sort() 排序,不过这样做会破坏数组(key 的位置变化了)。2. ExternalAppendOnlyMap
内存剩余空间检测
与 Hadoop MapReduce 规定 reducer 中 70% 的空间可用于 shuffle-sort 类似,Spark 也规定 executor 中
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
的空间(默认是0.3 * 0.8
)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是这 24% 的空间不是完全用于一个 ExternalOnlyAppendMap 的,而是由在 executor 上同时运行的所有 reducer 共享的。为此,exectuor 专门持有一个ShuffleMemroyMap: HashMap[threadId, occupiedMemory]
来监控每个 reducer 中 ExternalOnlyAppendMap 占用的内存量。每当 AppendOnlyMap 要扩展时,都会计算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的内存 + 扩展后的内存 是会否会大于内存限制,大于就会将 AppendOnlyMap spill 到磁盘。有一点需要注意的是前 1000 个 records 进入 AppendOnlyMap 的时候不会启动是否要 spill 的检查,需要扩展时就直接在内存中扩展。AppendOnlyMap 大小估计
为了获知 AppendOnlyMap 占用的内存空间,可以在每次扩展时都将 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但这样做非常耗时。所以 Spark 设计了粗略的估算算法,算法时间复杂度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小变化及一共 insert 的 records 的个数来估算大小,具体见
SizeTrackingAppendOnlyMap
和SizeEstimator
。Spill 过程
与 shuffle write 一样,在 spill records 到磁盘上的时候,会建立一个 buffer 缓冲区,大小仍为
spark.shuffle.file.buffer.kb
,默认是 32KB。另外,由于 serializer 也会分配缓冲区用于序列化和反序列化,所以如果一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为spark.shuffle.spill.batchSize
,默认是 10000。