Java多线程之CyclicBarrier
共 3820字,需浏览 8分钟
·
2021-11-30 14:22
这里就JUC包中的CyclicBarrier类做相关介绍
概述
JUC中的CyclicBarrier类是一个并发控制工具。其可以使线程在栅栏处进行等待。当指定数量的线程全部到达栅栏处后栅栏才会打开,从而使各线程结束阻塞继续向下执行。其主要方法如下所示,可以看到在线程全部到达栅栏时,还可以通过barrierAction参数设置准备打开栅栏前需执行的任务。其中,该任务由最后一个到达栅栏的线程负责执行。具体地,线程调用await方法实现告诉CyclicBarrier自己已经到达栅栏处,并阻塞等待栅栏打开
// 创建一个指定计数器值的CyclicBarrier实例
public CyclicBarrier(int parties);
// 创建一个指定计数器值的CyclicBarrier实例, 并指定栅栏打开前需执行的任务
public CyclicBarrier(int parties, Runnable barrierAction);
// 线程阻塞等待栅栏打开
public int await() throws InterruptedException, BrokenBarrierException;
// 支持超时的await方法
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
// 唤醒其他正在栅栏处被阻塞的线程(即抛出BrokenBarrierException异常), 同时将CyclicBarrier实例恢复为初始化状态,以便下一次使用
public void reset();
基本实践
下面即是一个CyclicBarrier的基本实践示例
public class CyclicBarrierTest1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
@Test
public void test1() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Runnable initTask = () -> {
info("---------------------------------");
};
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, initTask);
Stream.of("张三","李四","王二")
.map( name -> new PlayGame(name, cyclicBarrier) )
.forEach(playGame -> {
threadPool.execute(playGame);
} );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
info("Game Over");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String threadName = Thread.currentThread().getName();
String log = "["+time+"] "+ msg+" <"+threadName+">";
System.out.println(log);
}
/**
* 模拟业务耗时
*/
private static void doSomeWork() {
try{
Integer second = RandomUtils.nextInt(3,20);
System.out.println("second: " + second);
Thread.sleep( second * 1000 );
}catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
}
}
@AllArgsConstructor
private static class PlayGame implements Runnable{
private String name;
private CyclicBarrier cyclicBarrier;
@Override
public void run() {
// 模拟业务耗时
doSomeWork();
info(name + " 上线");
// 阻塞等待其他玩家上线
try{
cyclicBarrier.await();
}catch (Exception e) {
System.out.println( "Happen Exception: " + e);
}
info(name + " 选择角色 开始");
// 模拟业务耗时
doSomeWork();
info(name + " 选择角色 结束");
// 阻塞等待其他玩家选择角色
try{
cyclicBarrier.await();
}catch (Exception e) {
System.out.println( "Happen Exception: " + e);
}
info(name + " 开始游戏");
}
}
}
从测试结果可以看出,当用户 开始选择角色 或 开始游戏时,各线程是同时开始的。至此也可以看出其与CountDownLatch的显著区别,后者是一次性的,而前者CyclicBarrier则可以重复使用
基本原理
通过上面的代码示例,可以看到CyclicBarrier与CountDownLatch相比功能很类似。只不过前者可以重复使用,而后者则是一次性的。但二者在实现上却大相径庭,CountDownLatch是直接基于AQS实现的。而CyclicBarrier则是利用ReentrantLock、Condition进行实现的。具体地,当线程调用CyclicBarrier的await方法时,如果未达到指定数量时,则是通过Condition条件变量的await方法进行阻塞的;如果是最后一个线程则会通过Condition条件变量的signalAll方法来唤醒所有被阻塞的线程
与此同时,由于CyclicBarrier是可重复使用的。故每一轮结束后,其内部会通过nextGeneration方法生成所谓的下一代CyclicBarrier。本质上相当于重新实例化了一次CyclicBarrier
Note
在实际使用CyclicBarrier过程中,需要非常小心处理BrokenBarrierException异常。本文示例代码为了简便,故省略了异常处理过程。因为发生该异常说明栅栏被损坏了。推荐的处理措施有:一方面,调用CyclicBarrier的reset方法,来唤醒其他由于调用await方法而被阻塞的线程以避免一直被阻塞,同时将CyclicBarrier实例恢复至初始化状态;另一方面,推荐使用具有超时机制的await方法,以避免线程被永久性阻塞
参考文献
Java并发编程之美 翟陆续、薛宾田著