JUC并发编程之CountDownLatch源码详解
黎明大大
共 17529字,需浏览 36分钟
· 2021-07-08
如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:
以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。
/**
* @author sunny
* @date 2021/07/06 09:30
* @description
*/
@Slf4j
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
test01();
// test02();
// test03();
}
/**
* 模拟高并发场景
*
* @throws InterruptedException
*/
public static void test01() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
countDownLatch.await();
log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println("\n ========================= \n");
countDownLatch.countDown();
}
/**
* 模拟家庭旅游场景
*/
public static void test02() {
CountDownLatch countDownLatch = new CountDownLatch(11);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
log.info("{}:已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
}
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
log.info("{}:五秒后已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("\n ========================= \n");
countDownLatch.await();
log.info("开车旅游啦");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发
* 那么修路的过程中
*/
public static void test03() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 1; i <= 9; i++) {
new Thread(() -> {
try {
log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());
countDownLatch.await();
log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
log.info("======================准备开始动工======================");
for (int i = 1; i <= 9; i++) {
Thread.sleep(300);
int fi = i;
new Thread(() -> {
countDownLatch.countDown();
log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());
}).start();
}
TimeUnit.SECONDS.sleep(6);
new Thread(() -> {
log.info("{}:线程,已开工", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
TimeUnit.SECONDS.sleep(8);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
countDownLatch.await();
log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); // 更新 state 值
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 获取末尾节点
if (pred != null) {
node.prev = pred; // 当前节点的前驱引用指向为pred
if (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点
pred.next = node; // 原末尾节点后驱引用指向为当前节点
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取末尾节点
if (t == null) { // Must initialize // 构建双向链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取当前下一节点
if (s == null || s.isShared()) // 判断下节点是否为共享节点
doReleaseShared(); // 传播~~ 具体传播什么呢???
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
sync.releaseShared(1);
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //通用释放令牌
doReleaseShared(); //唤醒后驱节点
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
JUC并发编程之CountDownLatch源码讲解视频
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章
评论
光纤详解:光纤跳线如何分类,多向单模转换?
本文来自“光纤详解:光纤跳线如何分类,多向单模转换?”,光纤跳线作为光网络布线最基础的元件之一,被广泛应用于光纤链路的搭建中。如今,光纤制造商根据应用场景的不同推出众多类型的光纤跳线,如MPO/LC/SC/FC/ST光纤跳线,单工/双工光纤跳线,单模/多模光纤跳线等,它们之间各有特色,且不可替代。本
架构师技术联盟
0
微软开源MS-DOS操作系统源码,冲到GitHub第一了!
大家好,我是轩辕。这两天逛GitHub的时候,突然发现一个叫 MS-DOS的项目冲到Trending榜首了!定睛一看,微软官方啊,搜了一下才知道,原来前两天,微软把MS-DOS 4.0系统开源了!关于这个系统,估计现在很多程序员都不知道了,或者只在古老的教科书上看过这玩意儿。MS-DOS,全称为Mi
编程技术宇宙
6
网友发问:事业编一年6万,干35年退休挣200万,程序员一年60万,5年就挣300万,事业编再爽能有程序员干五年退休爽?
上一篇:阿里P9被裁,赔偿82w在职场中,我们不可避免地会面临多样的工作机会和选择。然而,如果我们仅将这些不同的工作机会仅以金钱作为衡量标准,那么这种比较就显得过于肤浅和狭隘。一些人可能会通过直接的数学计算来决定哪个职业道路更有利可图,但这种方法忽视了工作的本质、工作量的大小、职业成长的机会,以及经
开发者全社区
0
GPT的风也吹到了CV,详解自回归视觉模型的先驱! ImageGPT:使用图像序列训练图像 GPT模型
作者丨科技猛兽编辑丨极市平台导读 在 CIFAR-10 上,iGPT 使用 linear probing 实现了 96.3% 的精度,优于有监督的 Wide ResNet,并通过完全微调实现了 99.0% 的精度,匹配顶级监督预训练模型。本文目录1 自回归视觉模型的先驱 ImageGPT:
机器学习初学者
0
轻松学习C#:百度行驶证C++离线SDK接入详解
效果 先看最终效果SDK 拿到完整包如图,687M解压后看看内容发现有个readme.txt,那就先看看内容1:用vs2015打开sln工程,最好用vs2015 comunity版本,可微软官网下载。2:sdk的doc目录有pdf接口文档。3:工程总入口main.cpp、请参考示例实现您的功能。4:
DotNet NB
9
面试官:你了解 QPS、TPS、RT、吞吐量 这些高并发性能指标吗?
大家好,我是路人,更多优质文章见个人博客:http://itsoku.com一、QPS,每秒查询QPS:Queries Per Second意思是“每秒查询率”,是一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。互联网中,作为域名系统服务器的机器的性能
路人甲Java
0
只知道 Nginx 牛逼!却不知道它怎么支持百万并发?
前段时间在网上看到一个有意思的话题:只知道 Nginx 牛逼,却不知道它怎么支持百万并发?确实,这是一个好问题,面试常问,很多人都在这上面栽过跟头!所以,今天我们就来一起聊一聊这个话题。大家都知道,无论是运维、开发、测试,Nginx 技术栈的学习总是必不可少的,只是不同的岗位掌握的深度与广度不同而已
良许Linux
0
这都能卖了?曝光几款模拟经营游戏源码!可试玩体验
Cocos Store 经过不断积累与发展,已经成为开发者获取游戏开发资源,实现技术变现效率最高的平台。近年来,越来越多的独立开发者或企业纷纷入驻,而像模拟经营这类复杂系统的游戏,也纷纷出现在了 Cocos Store 资源商城中。今天就给大家介绍几款 2D\3D 模拟经营游戏源码,所有游戏可体验试
Creator星球游戏开发社区
10