CompletionService 介绍

共 4732字,需浏览 10分钟

 ·

2021-05-14 08:09

介绍

java.util.concurrent.CompletionService是对 ExecutorService的一个功能增强封装,优化了获取异步操作结果的接口。

使用场景

假设我们要向线程池提交一批任务,并获取任务结果。一般的方式是提交任务后,从线程池得到一批 Future对象集合,然后依次调用其 get()方法。

这里有个问题:因为我们会要按固定的顺序来遍历 Future元素,而 get()方法又是阻塞的,因此如果某个 Future对象执行时间太长,会使得我们的遍历过程阻塞在该元素上,无法及时从后面早已完成的 Future当中取得结果。

CompletionService解决了这个问题。它本身不包含线程池,创建一个 CompletionService需要先创建一个 Executor。下面是一个例子:

String result = completionService.take().get();

这个 take()方法返回的是最早完成的任务的结果,这个就解决了一个任务被另一个任务阻塞的问题。下面是一个完整的例子:

示例

final SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");


        CompletionService<String> csv = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(10));


        // 此线程池运行5个线程

        for (int i = 0; i < 5; i++) {

            final int index = i;

            csv.submit(new Callable<String>() {

                @Override

                public String call() throws Exception {

                    System.out.println("Thread-" + index + "-begin-" + sf.format(new Date()));

                    try {

                        Thread.sleep(1000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    System.out.println("Thread-" + index + "-end-" + sf.format(new Date()));

                    return "index-" + index;

                }


            });

        }


        try {

            Future<String> f = csv.poll(); // poll方法 返回的Future可能为 null,因为poll 是非阻塞执行的

            if (f != null) {

                System.out.println(f.get());

            } else {

                System.out.println("使用poll 获取到的Future为 null");

            }

        } catch (Exception e1) {

            e1.printStackTrace();

        }


        for (int i = 0; i < 5; i++) {

            try {

                // csv.take() 返回的是 最先完成任务的 Future 对象,take 方法时阻塞执行的

                System.out.println(csv.take().get());

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

这个例子的执行结果如下所示:

// 示例1:像使用 ExecutorService 一样使用 CompletionService
10:22:32:271 - Task 4 started, duration=400
10:22:32:271 - Task 3 started, duration=600
10:22:32:271 - Task 2 started, duration=800
10:22:32:271 - Task 1 started, duration=1000
10:22:32:687 - Task 4 completed.
10:22:32:888 - Task 3 completed.
10:22:33:089 - Task 2 completed.
10:22:33:303 - Task 1 completed.
10:22:33:303 - Result of task 1
10:22:33:303 - Result of task 2
10:22:33:303 - Result of task 3
10:22:33:303 - Result of task 4
// 示例2:按标准方式使用 CompletionService
10:22:33:305 - Task 5 started, duration=1000
10:22:33:305 - Task 7 started, duration=600
10:22:33:305 - Task 6 started, duration=800
10:22:33:305 - Task 8 started, duration=400
10:22:33:718 - Task 8 completed.
10:22:33:718 - Result of task 8
10:22:33:918 - Task 7 completed.
10:22:33:918 - Result of task 7
10:22:34:119 - Task 6 completed.
10:22:34:119 - Result of task 6
10:22:34:320 - Task 5 completed.
10:22:34:320 - Result of task 5

可以看出,在示例 1 中,虽然 Task 4 执行时间只有 400ms,但因为我们是按照 1-2-3-4 的顺序依次取结果,因此 Task 4 完成后并没有马上打印出结果来。而在示例 2 中,对每个 Task 都是在完成时立刻就将结果打印出来了。这就是 CompletionService的优势所在。

原理解释

CompletionService之所以能够做到这点,是因为它没有采取依次遍历 Future 的方式,而是在中间加上了一个结果队列,任务完成后马上将结果放入队列,那么从队列中取到的就是最早完成的结果。

如果队列为空,那么 take()方法会阻塞直到队列中出现结果为止。此外 CompletionService还提供一个 poll()方法,返回值与 take()方法一样,不同之处在于它不会阻塞,如果队列为空则立刻返回 null。这算是给用户多一种选择。


浏览 36
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报