JUC并发编程之CountDownLatch源码详解

如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:

以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。
/*** @author sunny* @date 2021/07/06 09:30* @description*/@Slf4jpublic class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {test01();// test02();// test03();}/*** 模拟高并发场景** @throws InterruptedException*/public static void test01() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());countDownLatch.await();log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}).start();}TimeUnit.SECONDS.sleep(3);System.out.println("\n ========================= \n");countDownLatch.countDown();}/*** 模拟家庭旅游场景*/public static void test02() {CountDownLatch countDownLatch = new CountDownLatch(11);for (int i = 0; i < 10; i++) {new Thread(() -> {log.info("{}:已经上车了", Thread.currentThread().getName());countDownLatch.countDown();}).start();}new Thread(() -> {try {TimeUnit.SECONDS.sleep(5);log.info("{}:五秒后已经上车了", Thread.currentThread().getName());countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();try {TimeUnit.SECONDS.sleep(2);System.out.println("\n ========================= \n");countDownLatch.await();log.info("开车旅游啦");} catch (InterruptedException e) {e.printStackTrace();}}/*** 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发* 那么修路的过程中*/public static void test03() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 1; i <= 9; i++) {new Thread(() -> {try {log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());countDownLatch.await();log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}).start();}TimeUnit.SECONDS.sleep(3);log.info("======================准备开始动工======================");for (int i = 1; i <= 9; i++) {Thread.sleep(300);int fi = i;new Thread(() -> {countDownLatch.countDown();log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());}).start();}TimeUnit.SECONDS.sleep(6);new Thread(() -> {log.info("{}:线程,已开工", Thread.currentThread().getName());countDownLatch.countDown();}).start();TimeUnit.SECONDS.sleep(8);for (int i = 0; i < 5; i++) {new Thread(() -> {try {countDownLatch.await();log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count); // 更新 state 值}
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //获取当前节点的前驱节点if (p == head) {int r = tryAcquireShared(arg); // 尝试获取令牌if (r >= 0) { // 获取令牌成功setHeadAndPropagate(node, r); //传播链表p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收failed = false;return; // 获取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 阻塞当前线程throw new InterruptedException();}} finally {// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态if (failed)cancelAcquire(node);}}
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 封装节点// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 获取末尾节点if (pred != null) {node.prev = pred; // 当前节点的前驱引用指向为predif (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点pred.next = node; // 原末尾节点后驱引用指向为当前节点return node;}}enq(node);return node;}


private Node enq(final Node node) {for (;;) {Node t = tail; // 获取末尾节点if (t == null) { // Must initialize // 构建双向链表if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //获取当前节点的前驱节点if (p == head) {int r = tryAcquireShared(arg); // 尝试获取令牌if (r >= 0) { // 获取令牌成功setHeadAndPropagate(node, r); //传播链表p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收failed = false;return; // 获取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞parkAndCheckInterrupt()) // 阻塞当前线程throw new InterruptedException();}} finally {// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,* or was recorded (as h.waitStatus either before* or after setHead) by a previous operation* (note: this uses sign-check of waitStatus because* PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,* or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态(h = head) == null || h.waitStatus < 0) {Node s = node.next; // 获取当前下一节点if (s == null || s.isShared()) // 判断下节点是否为共享节点doReleaseShared(); // 传播~~ 具体传播什么呢???}}
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}

private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】Node h = head;if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点int ws = h.waitStatus; // 获取head的节点状态if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒continue; // loop to recheck casesunparkSuccessor(h); // 唤醒线程 去获取令牌}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去continue; // loop on failed CAS}if (h == head) // loop if head changedbreak; // 跳出当前循环}}
private void unparkSuccessor(Node node) {// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现int ws = node.waitStatus;// 由于-1会小于0,所以更新改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取第一个正常排队的节点Node s = node.next;//正常解锁流程不会走该if判断if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒if (s != null)LockSupport.unpark(s.thread);}
sync.releaseShared(1);public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { //通用释放令牌doReleaseShared(); //唤醒后驱节点return true;}return false;}
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
JUC并发编程之CountDownLatch源码讲解视频
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


评论
