想使用多线程来提高处理速度,却还不知道使用CountDownLatch与CyclicBarries?
共 3274字,需浏览 7分钟
·
2020-11-12 22:58
上一篇文章介绍了Java并发包中提供的实现限流的工具类,不了解的小伙伴可以点击查阅:Semaphore是如何实现限流效果的,今天我们学习下控制、协调多线程同步的工具类 CountDownLatch 与 CyclicBarrier。它们俩主要用在当我们想使用多线程技术来提高程序的运行效率时,常常需要汇总结果,那么如果控制线程之间的步调一致就是需要解决的问题。
背景
前两天在看一段同事小洁写的代码时,发现这样一个场景:她有一批数据需要处理落库,她为了加速程序执行的速度,自然而然的就想到了使用多线程,可真是一个有追求的妹子,同时也喜欢摄影和写代码,啧啧啧。让我们来看看她是怎么做的。
业务逻辑也比较简单,数据库某张临时表里有一大批个人开户数据,她需要分批去获取数据,然后经过加工处理给用户的计算出等级,写入到用户等级表,最终将处理的记录数上报给监控系统。
将逻辑抽象成伪代码如下,方便我们后续进一步分析:
int count = 0;
while(临时表数据不为空) {
// 从临时表查询开户数据
size = getTmpData();
// 根据开户状态处理用户等级
level = getUserLevel();
// 用户等级数据入库
save();
count+=size;
}
// 上报监控系统
report();
上面的串行处理逻辑,她认为太慢了,因为整个系统是串行执行的,那么怎么优化呢?
这种因为串行化执行导致耗时过长的系统逻辑,那么通用套路就是上多线程,如果你觉得这里理解的不够形象,这里七哥贴心的在给你上两张张图:
可以明显的看到同样的时间多线程的吞吐量是单线程的3倍,优化效果还是很明显的。
思路理清楚了,我们先来看下小洁是怎样实现代码的。
小洁的多线程实现
// 创建3个线程的线程池
Executor executor = Executors.newFixedThreadPool(3);
// 控制线程个数
int size = 0;
// 上报监控系统的总数
int count = 0;
List<int> countList;
while(临时表数据不为空) {
executor.execute(()-> {
// 从临时表查询开户数据
countList.add(getTmpData());
// 根据开户状态处理用户等级
level = getUserLevel();
// 用户等级数据入库
save();
// 线程数
size++;
});
// 防止过多线程,满三个处理
if(size % 3 == 0) {
// 汇总处理数量
count+=countList;
}
}
上面的实现,直接定义了一个计算器size开始等于0,在线程池中的每个线程处理完后将size++,在主线程中去判断size==3,则累加获取结果。
CountDownLatch 实现线程之间等待
针对上面逻辑,其实Java并发包中已经提供了相关的工具类支持,那就是CountDownLatch。上面的代码中,在while循环里,我们首先创建一个CountDownLatch,计数器的初始化等于3,然后在每次线程中执行计数器减1,减1的操作是通过调用 latch.countDown()
来实现的,在主线程中我们通过调用 latch.wait() 来实现对计数器为0的等待阻塞。
// 创建3个线程的线程池
Executor executor = Executors.newFixedThreadPool(3);
// 控制线程个数
int size = 0;
// 上报监控系统的总数
int count = 0;
List<int> countList;
while(临时表数据不为空) {
// 计数器初始化为3
CountDownLatch latch = new CountDownLatch(3);
executor.execute(()-> {
// 从临时表查询开户数据
countList.add(getTmpData());
// 根据开户状态处理用户等级
level = getUserLevel();
// 用户等级数据入库
save();
latch.countDown();
});
// 等待操作结束
latch.await();
// 汇总处理数量
count+=countList;
}
使用CyclicBarrier 实现
上面的如果使用CycleBarrier应该如何实现呢? 值得一提的是 CyclicBarrier 具有自动重置功能,当计数器减到0的时候会自动重置初始值,相当于继续按照设定阻塞执行。
public class CyclicBarrierTest implements Runnable {
// 创建3个线程的线程池
Executor executor = Executors.newFixedThreadPool(3);
// 控制线程个数
int size = 0;
// 上报监控系统的总数
int count = 0;
List<int> countList;
/**
* 创建4个屏障,处理完之后执行当前类的run方法
*/
final CyclicBarrier barrier = new CyclicBarrier(3, this);
while(临时表数据不为空) {
executor.execute(()-> {
// 从临时表查询开户数据
countList.add(getTmpData());
// 根据开户状态处理用户等级
level = getUserLevel();
// 用户等级数据入库
save();
// 插入一个屏障
barrier.await();
});
}
@Override
public void run() {
// 等待操作结束
latch.await();
// 汇总处理数量
count+=countList;
}
使用线程池创建了3个线程,分别去跑数然后将处理结果放到队列中,再由 CyclicBarrier 线程去计算总的处理结果。
总结
CountDownLatch
和 CyclicBarrier
是 Java 并发包提供的两个非常易用的线程同步工具类。这两个工具类用法的区别在这里还是有必要再强调一下:
「CountDownLatch」 主要用来解决一个线程等待多个线程的场景,而且「只能使用一次」,初始化的值也不能修改,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过,如果需要重新使用就得重新创建。但 「CyclicBarrier」 的计数器是可以「循环利用」的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数。
最后,欢迎在留言区与我分享你的想法。感谢阅读,如果你觉得这篇文章对你有帮助的话,记得点赞关注走起来。
1.微服务实战系列
4.中间件等
更多信息请关注公众号:「软件老王」,关注不迷路,软件老王和他的IT朋友们,分享一些他们的技术见解和生活故事。