JUC并发编程之CountDownLatch源码详解
如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:
以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。
/**
* @author sunny
* @date 2021/07/06 09:30
* @description
*/
@Slf4j
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
test01();
// test02();
// test03();
}
/**
* 模拟高并发场景
*
* @throws InterruptedException
*/
public static void test01() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
countDownLatch.await();
log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println("\n ========================= \n");
countDownLatch.countDown();
}
/**
* 模拟家庭旅游场景
*/
public static void test02() {
CountDownLatch countDownLatch = new CountDownLatch(11);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
log.info("{}:已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
}
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
log.info("{}:五秒后已经上车了", Thread.currentThread().getName());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("\n ========================= \n");
countDownLatch.await();
log.info("开车旅游啦");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发
* 那么修路的过程中
*/
public static void test03() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 1; i <= 9; i++) {
new Thread(() -> {
try {
log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());
countDownLatch.await();
log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(3);
log.info("======================准备开始动工======================");
for (int i = 1; i <= 9; i++) {
Thread.sleep(300);
int fi = i;
new Thread(() -> {
countDownLatch.countDown();
log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());
}).start();
}
TimeUnit.SECONDS.sleep(6);
new Thread(() -> {
log.info("{}:线程,已开工", Thread.currentThread().getName());
countDownLatch.countDown();
}).start();
TimeUnit.SECONDS.sleep(8);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
countDownLatch.await();
log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); // 更新 state 值
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 获取末尾节点
if (pred != null) {
node.prev = pred; // 当前节点的前驱引用指向为pred
if (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点
pred.next = node; // 原末尾节点后驱引用指向为当前节点
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取末尾节点
if (t == null) { // Must initialize // 构建双向链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取当前下一节点
if (s == null || s.isShared()) // 判断下节点是否为共享节点
doReleaseShared(); // 传播~~ 具体传播什么呢???
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
sync.releaseShared(1);
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //通用释放令牌
doReleaseShared(); //唤醒后驱节点
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
JUC并发编程之CountDownLatch源码讲解视频
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章
评论