“既生 ExecutorService, 何生 CompletionService?”
Hollis
共 2435字,需浏览 5分钟
· 2020-08-21
ExecutorService VS CompletionService
ExecutorService executorService = Executors.newFixedThreadPool(4);
Listfutures = new ArrayList >();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
Integer result = future.get();
// 其他业务逻辑
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );
Listfutures = new ArrayList >();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; iInteger result = executorCompletionService.take().get();
// 其他业务逻辑
}
如果 Future 结果没有完成,调用 get() 方法,程序会阻塞在那里,直至获取返回结果
那 CompletionService 是怎么做到获取最先执行完的任务结果的呢?
远看CompletionService 轮廓
就是一个将异步任务的生产和任务完成结果的消费解耦的服务
CompletionService
自然也要围绕着几个关键字做文章了既然是异步任务,那自然可能用到 Runnable 或 Callable 既然能获取到结果,自然也会用到 Future 了
近看 CompletionService 源码
CompletionService
是一个接口,它简单的只有 5 个方法:Future submit(Callable ;task)
Futuresubmit(Runnable task, V result) ;
Futuretake() throws InterruptedException ;
Futurepoll() ;
Futurepoll(long timeout, TimeUnit unit) throws InterruptedException ;
Take: 如果队列为空,那么调用 take() 方法的线程会被阻塞 Poll: 如果队列为空,那么调用 poll() 方法的线程会返回 null Poll-timeout: 以超时的方式获取并移除阻塞队列中的第一个元素,如果超时时间到,队列还是空,那么该方法会返回 null
提交异步任务 (submit) 从队列中拿取并移除第一个元素 (take/poll)
CompletionService
只是接口,ExecutorCompletionService
是该接口的唯一实现类ExecutorCompletionService 源码分析
ExecutorCompletionService
有两种构造函数:private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue> completionQueue;
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
LinkedBlockingQueue
,任务执行结果就是加入到这个阻塞队列中的ExecutorCompletionService
,我们只需要知道一个问题的答案就可以了:它是如何将异步任务结果放到这个阻塞队列中的?
public Future submit(Callable {task)
if (task == null) throw new NullPointerException();
RunnableFuturef = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
finishCompletion()
方法:private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 重点 重点 重点
done();
callable = null; // to reduce footprint
}
protected void done() {
completionQueue.add(task);
}
CompletionService 的主要用途
假设你有一组针对某个问题的solvers,每个都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用(Result r)
void solve(Executor e,
Collection> solvers)
throws InterruptedException, ExecutionException {
CompletionServiceecs
= new ExecutorCompletionService(e);
for (Callables : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务
void solve(Executor e,
Collection> solvers)
throws InterruptedException {
CompletionServiceecs
= new ExecutorCompletionService(e);
int n = solvers.size();
List> futures
= new ArrayList>(n);
Result result = null;
try {
for (Callables : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
for (Futuref : futures)
// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中
f.cancel(true);
}
if (result != null)
use(result);
}
总结
Dubbo 中的 Forking Cluster 多仓库文件/镜像下载(从最近的服务中心下载后终止其他下载过程) 多服务调用(天气预报服务,最先获取到的结果)
灵魂追问
通常处理结果还会用异步方式进行处理,如果采用这种方式,有哪些注意事项? 如果是你,你会选择使用无界队列吗?为什么?
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
评论
【深度学习】何恺明和刘壮新作:消除数据集偏差的十年之战
作 者丨量子位 来源丨量子位 编辑丨极市平台 MIT新晋副教授何恺明,新作新鲜出炉: 瞄准一个横亘在AI发展之路上十年之久的问题:数据集偏差。 该研究为何恺明在Meta期间与刘壮合作完成,他们在论文中指出...
机器学习初学者
0
AI Tools06-Ideogram,目前文字控制力最好的AI生图软件
现在很大AI工具的生图能力已经很强了,包括MJ,SD等,以及最近大家使用最多的comfy_UI来生成图像的自动化流程。可以说我们能够想象得到的艺术类风格,或写意创作或者写实类摄影效果,AI都已经拿捏的没问题了。就文生...
波悟馆
0
新发现,原型图居然也能用AI一键生成了!
作为一名产品经理,画原型图是否遇到以下痛点:1、出图效率低,就算是通用的注册登录模块,也要花很多时间;2、遇到新功能时,不知道原型图如何下手。其实不用担心,现在爆火的AI工具已经解决这些问题!我最近在把玩...
小狐学产品
0
何恺明新作来了!!
MIT新晋副教授何恺明,新作新鲜出炉:瞄准一个横亘在AI发展之路上十年之久的问题:数据集偏差。该研究为何恺明在Meta期间与刘壮合作完成,他们在论文中指出:尽管过去十多年里业界为构建更大、更多样化、更全面、偏...
NLP从入门到放弃
0
腾讯联合清华大学、香港科技大学推出全新图生视频模型「Follow-Your...
3 月 15 日,腾讯和清华大学、香港科技大学联合推出全新图生视频模型「Follow-Your-Click」,目前已经上架 GitHub(代码四月公开),同时还发表了一篇研究论文(DOI:2403.08268)。 这款图生视频模型主要功能包括局...
Machree
0
既可爱又性感,她是怎么做到的?
今日继续为大家带来台妹风采,聚焦于乐天啦啦队的菲菲。 菲菲展现出的风格独特且迷人,她巧妙地融合了可爱与性感两种特质,让人为之倾心。这种风格的展现,不仅让人感受到她的青春活力,更突显出她的女性魅力,实在...
全栈架构社区
0
聚焦两会︱何敬麟:湾区融合互促青年发展
聚焦两会 在十四届全国人大二次会议澳门代表团全体会议上,在接受媒体访问如何推动澳门青年积极融入粤港澳大湾区发展,澳区全国人大代表、澳门工商联会会长、澳门青创国际集团董事长何敬麟表示,今年是《粤港澳大湾...
proginn0346170850
0
魅族 21 PRO 正式发布,AI时代的手机有何不同?
2024 年 2 月 29 日,星纪魅族集团在珠海正式发布魅族 21 PRO 开放式 AI 终端。魅族 21 PRO 是面向 AI 时代全新打造的明日设备,集手机经典时代设计、交互、工艺之大成,以旗舰性能打磨满分答卷,为魅族开启 AI 时代...
城宇
0