粉丝:什么情况下,hive 只会产生一个reduce任务,而没有maptask

浪尖聊大数据

共 5387字,需浏览 11分钟

 ·

2021-04-02 17:18

今天下午,在微信群里看到粉丝聊天,提到了一个某公司的面试题:

什么情况下,hive 只会产生一个reduce任务,而没有maptask


这个问题是不是很神奇?

我们常规使用的mapreducer任务执行过程大致如下图:

appmaster通过某种策略计算数据源可以做多少分片(getSplits方法),对应的生成固定数量的maptask,假如存在shuffle的话,就根据默认或者指定的reducer数,将数据分散给特定数量的reducer。不存在shuffle的话,reducer可以为零的。

正常逻辑:

mapTask的职责就是负责读数据,做ETL,也可以利用combiner局部聚合;ReduceTask输入严重依赖于mapper输出,所以‘一直’的逻辑仅有reducer无法执行的。

没有maptask,仅有一个reducerTask的hive任务。有点违背我们的使用常识了哦。

其实,正常使用的情况下,hive的sql模式执行引擎还是主要依赖于hadoop的mapreduce计算框架。

但是仅仅依赖于sql,然后依赖于hadoop的mapreduce,显然不能满足hive的野心的。所以hive想提供一个单独的可以使用java编写的hive自己的map/reduce的框架。

你在hive搜索reducer可以直接找到hive自己的reducer。

hql常规解析后依赖的mapreduce主要还是ExecReducer/ExecMapper--通过实现hadoop的mapper和reducer来实现具体执行逻辑。

hive自己的mapreducer有一个实现案例,就是GenericMR,实现源码如下:

public final class GenericMR {  public void map(final InputStream in, final OutputStream out,      final Mapper mapper) throws Exception {    map(new InputStreamReader(in), new OutputStreamWriter(out), mapper);  }
public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception { handle(in, out, new RecordProcessor() { @Override public void processNext(RecordReader reader, Output output) throws Exception { mapper.map(reader.next(), output); } }); }
public void reduce(final InputStream in, final OutputStream out, final Reducer reducer) throws Exception { reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer); }
public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception { handle(in, out, new RecordProcessor() { @Override public void processNext(RecordReader reader, Output output) throws Exception { reducer.reduce(reader.peek()[0], new KeyRecordIterator( reader.peek()[0], reader), output); } }); }
private void handle(final Reader in, final Writer out, final RecordProcessor processor) throws Exception { final RecordReader reader = new RecordReader(in); final OutputStreamOutput output = new OutputStreamOutput(out);
try { while (reader.hasNext()) { processor.processNext(reader, output); } } finally { try { output.close(); } finally { reader.close(); } } }
private static interface RecordProcessor { void processNext(final RecordReader reader, final Output output) throws Exception; }
private static final class KeyRecordIterator implements Iterator<String[]> { private final String key; private final RecordReader reader;
private KeyRecordIterator(final String key, final RecordReader reader) { this.key = key; this.reader = reader; }
@Override public boolean hasNext() { return (reader.hasNext() && key.equals(reader.peek()[0])); }
@Override public String[] next() { if (!hasNext()) { throw new NoSuchElementException(); }
return reader.next(); }
@Override public void remove() { throw new UnsupportedOperationException(); } }
private static final class RecordReader { private final BufferedReader reader; private String[] next;
private RecordReader(final InputStream in) { this(new InputStreamReader(in)); }
private RecordReader(final Reader in) { reader = new BufferedReader(in); next = readNext(); }
private String[] next() { final String[] ret = next;
next = readNext();
return ret; }
private String[] readNext() { try { final String line = reader.readLine(); return (line == null ? null : line.split("\t")); } catch (final Exception e) { throw new RuntimeException(e); } }
private boolean hasNext() { return next != null; }
private String[] peek() { return next; }
private void close() throws Exception { reader.close(); } }
private static final class OutputStreamOutput implements Output { private final PrintWriter out;
private OutputStreamOutput(final OutputStream out) { this(new OutputStreamWriter(out)); }
private OutputStreamOutput(final Writer out) { this.out = new PrintWriter(out); }
public void close() throws Exception { out.close(); }
@Override public void collect(String[] record) throws Exception { out.println(_join(record, "\t")); }
private static String _join(final String[] record, final String separator) { if (record == null || record.length == 0) { return ""; } final StringBuilder sb = new StringBuilder(); for (int i = 0; i < record.length; i++) { if (i > 0) { sb.append(separator); } sb.append(record[i]); } return sb.toString(); } }}

重点:

常规的mapreducer的reducer输入依赖于mapper的输出的,所以无法单独执行。但是GenericMR的实现reducer里可以直接支持InputStream/Reader,所以就可以直接生成java的指定输入流或者reader即可。


new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(),          new Reducer() {        public void reduce(String key, Iterator<String[]> records,            Output output) throws Exception {          while (true) {            records.next();          }        }});


这个问题确实很另类,浪尖估计是被面试者简历写了精通hive源码,才被大佬由此一问。要不正常使用者,不会注意到这个框架。

不过这个也给大家提个醒,关注框架使用的同时,也要关注框架的历史及发展。

总结一下就是:hive野心很大,不想仅仅限于hql,想提供一个单独的可以用java编写的hive自己的map/reduce计算框架。

欢迎关注浪尖,留言更多比较有趣的问题哦!

浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报