“既生 ExecutorService, 何生 CompletionService?”
ExecutorService VS CompletionService
ExecutorService executorService = Executors.newFixedThreadPool(4);
Listfutures = new ArrayList >();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
Integer result = future.get();
// 其他业务逻辑
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );
Listfutures = new ArrayList >();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; iInteger result = executorCompletionService.take().get();
// 其他业务逻辑
}
如果 Future 结果没有完成,调用 get() 方法,程序会阻塞在那里,直至获取返回结果
那 CompletionService 是怎么做到获取最先执行完的任务结果的呢?
远看CompletionService 轮廓
就是一个将异步任务的生产和任务完成结果的消费解耦的服务
CompletionService
自然也要围绕着几个关键字做文章了既然是异步任务,那自然可能用到 Runnable 或 Callable 既然能获取到结果,自然也会用到 Future 了
近看 CompletionService 源码
CompletionService
是一个接口,它简单的只有 5 个方法:Future submit(Callable ;task)
Futuresubmit(Runnable task, V result) ;
Futuretake() throws InterruptedException ;
Futurepoll() ;
Futurepoll(long timeout, TimeUnit unit) throws InterruptedException ;
Take: 如果队列为空,那么调用 take() 方法的线程会被阻塞 Poll: 如果队列为空,那么调用 poll() 方法的线程会返回 null Poll-timeout: 以超时的方式获取并移除阻塞队列中的第一个元素,如果超时时间到,队列还是空,那么该方法会返回 null
提交异步任务 (submit) 从队列中拿取并移除第一个元素 (take/poll)
CompletionService
只是接口,ExecutorCompletionService
是该接口的唯一实现类ExecutorCompletionService 源码分析
ExecutorCompletionService
有两种构造函数:private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue> completionQueue;
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
LinkedBlockingQueue
,任务执行结果就是加入到这个阻塞队列中的ExecutorCompletionService
,我们只需要知道一个问题的答案就可以了:它是如何将异步任务结果放到这个阻塞队列中的?
public Future submit(Callable {task)
if (task == null) throw new NullPointerException();
RunnableFuturef = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
finishCompletion()
方法:private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 重点 重点 重点
done();
callable = null; // to reduce footprint
}
protected void done() {
completionQueue.add(task);
}
CompletionService 的主要用途
假设你有一组针对某个问题的solvers,每个都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用(Result r)
void solve(Executor e,
Collection> solvers)
throws InterruptedException, ExecutionException {
CompletionServiceecs
= new ExecutorCompletionService(e);
for (Callables : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务
void solve(Executor e,
Collection> solvers)
throws InterruptedException {
CompletionServiceecs
= new ExecutorCompletionService(e);
int n = solvers.size();
List> futures
= new ArrayList>(n);
Result result = null;
try {
for (Callables : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
for (Futuref : futures)
// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中
f.cancel(true);
}
if (result != null)
use(result);
}
总结
Dubbo 中的 Forking Cluster 多仓库文件/镜像下载(从最近的服务中心下载后终止其他下载过程) 多服务调用(天气预报服务,最先获取到的结果)
灵魂追问
通常处理结果还会用异步方式进行处理,如果采用这种方式,有哪些注意事项? 如果是你,你会选择使用无界队列吗?为什么?
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
评论