昨天的一个问题及答案(关键字Gzip、MapReduce、Spark)

共 7025字,需浏览 15分钟

 ·

2021-09-27 16:21

点击上方蓝色字体,选择“设为星标”

回复”面试“获取更多惊喜

问题是这样的:

HDFS上存储了一个大小10G不可分割压缩格式的文件(gzip格式),当有一个mr任务去读取这个文件的时候会产生多少个map task?spark去读取这种不可分割格式的大文件时是怎么处理的呢?

关于这个问题,大家应该都看过这个:

Hadoop所支持的几种压缩格式

gzip文件最大的特点在于:不可分割。

OK,我们知道gzip不可分割了。那么一个10G的gzip文件在HDFS是怎么存储的呢?

首先,一个10G的gzip文件在HDFS是放在一个DataNode上,但是blocks=ceil(10G/128M),副本还是3份(hadoop2.0 默认),因为gzip不可分割。

意思就是,这个gzip文件会被存储在一个DataNode上,但是占用的block数量还是 10G/每个block的大小(假设是128M),并且向上取整。

其次,MapReduce在读gzip文件的时候要指定解压方法,就是GzipCodec。然后用InputStream方法去读,MapTask的个数和读取一般文件的个数是一样的。

关于Hadoop Maptask个数,有一个计算公式。代码逻辑和计算公式如下:

作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:

public List<InputSplit> getSplits(JobContext job
                                    ) throws IOException {
    //getFormatMinSplitSize():始终返回1
    //getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

    //getMaxSplitSize(job):获取"mapred.max.split.size"的值,
    //默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus>files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job.getConfiguration());
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        //计算split大小
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        //计算split个数
        long bytesRemaining = length;    //bytesRemaining表示剩余字节数
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
                                   blkLocations[blkIndex].getHosts()));
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }

    // Save the number of input files in the job-conf
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }

首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:

 protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
     return Math.max(minSize, Math.min(maxSize, blockSize));
}

接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:

  • 文件剩余字节数/splitSize > 1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize;
  • 文件剩余字节数/splitSize < 1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)

我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:

一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。到这里split的划分就介绍完了,但是有两个问题需要考虑:

  1. 如果一个record跨越了两个block该怎么办?

这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:

  1. 上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?

一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。

最后,Spark在读取gzip这种不可分割文件的时候,就退化成从单个task读取、单个core执行任务,很容易产生性能瓶颈。你可以做个测试。在spark的页面上可以看到效果。

基于以上所以,gzip格式最好提前进行分割成小文件或者换格式,因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上,但这个操作会导致大量单节点性能占用,因此该格式建议不在spark上使用。

gzip问题这么多,常用的场景我能想到的只有一个,就是每天的日志文件。单个日志文件不太大,百兆以内。其他的场景暂时想不到。



八千里路云和月 | 从零到大数据专家学习路径指南

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」


你好,我是王知无,一个大数据领域的硬核原创作者。

做过后端架构、数据中间件、数据平台&架构、算法工程化。

专注大数据领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。

浏览 11
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报