Flink 源码深度解析-Async IO的实现

程序源代码

共 13018字,需浏览 27分钟

 ·

2022-02-26 21:32

点击上方蓝色字体,选择“设为星标”
回复"面试"获取更多惊喜
Hi,我是王知无,一个大数据领域的原创作者。 
放心关注我,获取更多行业的一手消息。

Async I/O的使用方式

在Flink中使用Async I/O的话,需要有一个支持异步请求的客户端,或者以多线程异步的方式来将同步操作转化为异步操作调用;

以官方文档给出的说明为例:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture> resultFuture) throws Exception {
        // issue the asynchronous request, receive a future for result
        // 发起异步请求,返回结果是一个Future
        final Future result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 请求完成时的回调,将结果交给 ResultFuture
        CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream stream = ...;

// apply the async I/O transformation
// 应用async I/O转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
DataStream> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

AsyncDataStream提供了两种调用方法,分别是orderedWait和unorderedWait,这分别对应了有序和无序两种输出模式。

之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。

  • 在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;
  • 而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。

值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证watermark的正常处理,即在两个watermark之间的消息的异步请求结果可能是异步提交的,但在watermark之后的消息不能先于该watermark之前的消息提交。

由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。

Async I/O的实现

AsyncDataStream在运行时被转换为AsyncWaitOperator算子,它是AbstractUdfStreamOperator的子类。其AsyncWaitOperator的基本实现原理如下:

基本原理

AsyncWaitOperator算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。

因此,在AsyncWaitOperator内部采用了一种“生产者-消费者”模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue提供了一种队列的抽象,一个“消费者”线程Emitter从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色;

如图所示,AsyncWaitOperator主要由两部分组成:StreamElementQueue和Emitter。

StreamElementQueue是一个Promise队列,所谓Promise是一种异步抽象表示将来会有一个值,这个队列是未完成的Promise队列,也就是进行中的请求队列。Emitter是一个单独的线程,负责发送消息(收到的异步回复)给下游。

图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个“Promise” P5,然后将P5放入队列。最后调用AsyncFunction的asyncInvoke方法,该方法会向外部服务发起一个异步的请求,并注册回调。

该回调会在异步请求成功返回时调用AsyncCollector.collect方法将返回的结果交给框架处理。

实际上AsyncCollector是一个Promise,也就是 P5,在调用collect的时候会标记Promise为完成状态,并通知Emitter线程有完成的消息可以发送了。Emitter就会从队列中拉取完成的Promise,并从Promise中取出消息发送给下游。

public class AsyncWaitOperator
      extends AbstractUdfStreamOperator>
      implements OneInputStreamOperator, OperatorActions {
          
    /** Queue to store the currently in-flight stream elements into. */
    private transient StreamElementQueue queue;               // 存储带有异步返回值的请求队列
    
    /** Pending stream element which could not yet added to the queue. */
    private transient StreamElementQueueEntry pendingStreamElementQueueEntry;
    
    private transient ExecutorService executor;
    
    /** Emitter for the completed stream element queue entries. */
    private transient Emitter emitter;                  // 异步返回后的消费线程
    
    /** Thread running the emitter. */
    private transient Thread emitterThread;
    
    @Override
    public void setup(StreamTask containingTask, StreamConfig config, Output> output) {
       super.setup(containingTask, config, output);
       this.checkpointingLock = getContainingTask().getCheckpointLock();
       this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
    
       // create the operators executor for the complete operations of the queue entries
       this.executor = Executors.newSingleThreadExecutor();
       // 根据不同的数据输出模式 有序、无序;选择构建不同的StreamElementQueue queue
       switch (outputMode) {
          case ORDERED:
             queue = new OrderedStreamElementQueue(
                capacity,
                executor,
                this);
             break;
          case UNORDERED:
             queue = new UnorderedStreamElementQueue(
                capacity,
                executor,
                this);
             break;
          default:
             throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
       }
    }
    
    @Override
    public void open() throws Exception {
       super.open();
       // create the emitter
       this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
    
       // start the emitter thread
       // 构建 消费者线程 emitter Thread 
       this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
       emitterThread.setDaemon(true);
       emitterThread.start();
       // .........
    }
    
    @Override
    public void processElement(StreamRecord element) throws Exception {
       final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
       // 注册一个定时器,在超时时调用 timeout 方法
       if (timeout > 0L) {
          // register a timeout for this AsyncStreamRecordBufferEntry
          long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
          final ScheduledFuture timerFuture = getProcessingTimeService().registerTimer(
             timeoutTimestamp,
             new ProcessingTimeCallback() {
                @Override
                public void onProcessingTime(long timestamp) throws Exception {
                   userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                }
             });
          // Cancel the timer once we've completed the stream record buffer entry. This will remove
          // the register trigger task
          streamRecordBufferEntry.onComplete(
             (StreamElementQueueEntry> value) -> {
                timerFuture.cancel(true);
             },
             executor);
       }
       // 加入队列
       addAsyncBufferEntry(streamRecordBufferEntry);
       // 发送异步请求
       userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }
 
 //尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
 private  void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
       assert(Thread.holdsLock(checkpointingLock));
       pendingStreamElementQueueEntry = streamElementQueueEntry;
       while (!queue.tryPut(streamElementQueueEntry)) { // 将该请求加入队列;如果队列已满(到达异步请求的上限),会阻塞
          // we wait for the emitter to notify us if the queue has space left again
          checkpointingLock.wait();
       }
       pendingStreamElementQueueEntry = null;
    }
}

public class Emitter implements Runnable {
    @Override
    public void run() {
       try {
          while (running) {
             LOG.debug("Wait for next completed async stream element result.");
             // 从队列阻塞地获取元素,之后再向下游传递
             AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
             output(streamElementEntry);
          }
       } catch (InterruptedException e) {
             // .........
       } 
    }
}

有序模式

在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue的具体是实现是OrderedStreamElementQueue。

OrderedStreamElementQueue的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。

有序模式比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括watermark),都会包装成Promise并按到达顺序放入该队列。

如下图所示,尽管P4的结果先返回,但并不会发送,只有P1(队首)的结果返回了才会触发Emitter拉取队首元素进行发送。如下图所示:

public class OrderedStreamElementQueue implements StreamElementQueue {
 /** Capacity of this queue. */
 private final int capacity;

 /** Queue for the inserted StreamElementQueueEntries. */
 private final ArrayDeque> queue;
 
    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {  // 从队列中阻塞地获取已异步完成的元素
       lock.lockInterruptibly();
       try {
          while (queue.isEmpty() || !queue.peek().isDone()) {
             headIsCompleted.await();
          }
          // 只有队列头部的请求完成后才解除阻塞状态
          LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity);
          return queue.peek();
       } finally {
          lock.unlock();
       }
    }
 
  @Override
    public AsyncResult poll() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          while (queue.isEmpty() || !queue.peek().isDone()) {
             headIsCompleted.await();
          }
          notFull.signalAll();
          LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity);
          return queue.poll();
       } finally {
          lock.unlock();
       }
    }
    
    @Override
    public  boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();  // 将该请求加入队列;如果队列已满(到达异步请求的上限),返回false,其外部会阻塞
       try {
          if (queue.size() < capacity) {   // 未达容量上限
             addEntry(streamElementQueueEntry);  
             LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity);
             return true;
          } else {
             LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity);
             return false;
          }
       } finally {
          lock.unlock();
       }
    }
}

无序模式

在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。

当然,在使用“事件时间”的情况下,要保证watermark语义的正确性。

在使用“处理时间”的情况下,由于不存在Watermark,因此可以看作一种特殊的情况。

在UnorderedStreamElementQueue中巧妙地实现了这两种情况。

ProcessingTime无序

ProcessingTime无序也比较简单,因为没有watermark,不需要协调watermark与消息的顺序性,所以使用两个队列就能实现,一个uncompletedQueue、一个completedQueue。所有新进入该算子的元素,同样的包装成Promise并放入uncompletedQueue队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该Promise移到completedQueue队列中,并通知Emitter消费。如下图所示:

EventTime无序

EventTime无序类似于有序与ProcessingTime无序的结合体。因为有watermark,需要协调watermark与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的Promise变成了Promise集合。

如果进入算子的是消息元素,则会包装成Promise放入队尾的集合中。

如果进入算子的是watermark,也会包装成Promise并放到一个独立的集合中,再将该集合加入到uncompletedQueue队尾,最后再创建一个空集合加到uncompletedQueue队尾。

这样,watermark就成了消息顺序的边界。

只有处在队首的集合中的Promise返回了数据,才能将该Promise移到completedQueue队列中,由Emitter消费发往下游。

只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个watermark之前所有的消息都已经被发送了,该watermark才能被发送。

过程如下图所示:

public class UnorderedStreamElementQueue implements StreamElementQueue {
    /** Queue of uncompleted stream element queue entries segmented by watermarks. */
    private final ArrayDeque>> uncompletedQueue;
    
    /** Queue of completed stream element queue entries. */
    private final ArrayDeque> completedQueue;
    
    /** First (chronologically oldest) uncompleted set of stream element queue entries. */
    private Set> firstSet;
    
    // Last (chronologically youngest) uncompleted set of stream element queue entries. New
    // stream element queue entries are inserted into this set.
    private Set> lastSet;

    @Override
    public  boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();
       try {
          if (numberEntries < capacity) {
             addEntry(streamElementQueueEntry);
             LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
             return true;
          } else {
             LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity);
             return false;
          }
       } finally {
          lock.unlock();
       }
    }
    
    private  void addEntry(StreamElementQueueEntry streamElementQueueEntry) {
       assert(lock.isHeldByCurrentThread());
       if (streamElementQueueEntry.isWatermark()) {
          // 如果是watermark,就要构造一个只包含这个watermark的set加入到uncompletedQueue队列中
          lastSet = new HashSet<>(capacity);
          if (firstSet.isEmpty()) {
             firstSet.add(streamElementQueueEntry);
          } else {
             Set> watermarkSet = new HashSet<>(1);
             watermarkSet.add(streamElementQueueEntry);
             uncompletedQueue.offer(watermarkSet);
          }
          uncompletedQueue.offer(lastSet);
       } else {
          lastSet.add(streamElementQueueEntry);  // 正常记录,加入lastSet中
       }
    
       streamElementQueueEntry.onComplete(       // 设置异步请求完成后的回调
          (StreamElementQueueEntry value) -> {
             try {
                onCompleteHandler(value);
             } catch (InterruptedException e) {
                // ......
             }
          }, executor);
       numberEntries++;
    }

    // 异步请求完成的回调
    public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();
       try {
          // 如果完成的异步请求在firstSet中,那么就将firstSet中已完成的异步请求转移到completedQueue中
          if (firstSet.remove(streamElementQueueEntry)) {  
             completedQueue.offer(streamElementQueueEntry);
             while (firstSet.isEmpty() && firstSet != lastSet) {
                // 如果firset中所有的异步请求都完成了,那么就从uncompletedQueue获取下一个集合作为firstSet
                firstSet = uncompletedQueue.poll();
                Iterator> it = firstSet.iterator();
                while (it.hasNext()) {
                   StreamElementQueueEntry bufferEntry = it.next();
                   if (bufferEntry.isDone()) {
                      completedQueue.offer(bufferEntry);
                      it.remove();
                   }
                }
             }
             LOG.debug("Signal unordered stream element queue has completed entries.");
             hasCompletedEntries.signalAll();
          }
       } finally {
          lock.unlock();
       }
    }
    
    @Override
    public AsyncResult poll() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          // 等待completedQueue中的元素
          while (completedQueue.isEmpty()) {
             hasCompletedEntries.await();
          }
          numberEntries--;
          notFull.signalAll();
          LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
          return completedQueue.poll();
       } finally {
          lock.unlock();
       }
    }    
}

容错

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态中取出这些消息,再重新处理一遍。为了保证exactly-once特性,对于异步调用已经完成,且结果已经由emitter提交给下游的消息就无需保存在快照中。

public class AsyncWaitOperator
      extends AbstractUdfStreamOperator>
      implements OneInputStreamOperator, OperatorActions {
          
 /** Recovered input stream elements. */
 private transient ListState recoveredStreamElements;

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
       super.initializeState(context);
       recoveredStreamElements = context
          .getOperatorStateStore()
          .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
    }
    
    @Override
    public void open() throws Exception {
       super.open();
       // create the emitter
       // 创建emitter消费线程
       
       // process stream elements from state, since the Emit thread will start as soon as all
       // elements from previous state are in the StreamElementQueue, we have to make sure that the
       // order to open all operators in the operator chain proceeds from the tail operator to the
       // head operator.
       // 状态恢复的时候,从状态中取出所有未完成的消息,重新处理一遍
       if (recoveredStreamElements != null) {
          for (StreamElement element : recoveredStreamElements.get()) {
             if (element.isRecord()) {
                processElement(element.asRecord());
             }
             else if (element.isWatermark()) {
                processWatermark(element.asWatermark());
             }
             else if (element.isLatencyMarker()) {
                processLatencyMarker(element.asLatencyMarker());
             }
             else {
                throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
             }
          }
          recoveredStreamElements = null;
       }
    }
    
    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
       super.snapshotState(context);
       // 先清除状态
       ListState partitionableState =
          getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
       partitionableState.clear();
       
       // 将所有未完成处理请求对应的消息加入状态中
       Collection> values = queue.values();
       try {
          for (StreamElementQueueEntry value : values) {
             partitionableState.add(value.getStreamElement());
          }
    
          // add the pending stream element queue entry if the stream element queue is currently full
          if (pendingStreamElementQueueEntry != null) {
             partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
          }
       } catch (Exception e) {
          partitionableState.clear();
          throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e);
       }
    }
}
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!


2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
互联网最坏的时代可能真的来了
我在B站读大学,大数据专业
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报