JUC并发编程之CountDownLatch源码详解

黎明大大

共 17529字,需浏览 36分钟

 · 2021-07-08


点击上方蓝字 关注我吧



1
前言

关于JUC包下的工具类,到目前为止已经分享了ReentranLock、Semaphore这两个工具类,同样很多前置内容在前面两遍博文中也都要讲到,那么今天所分享的是CountDownLatch工具类、通过前面博文我们知道ReentranLock是独占锁模式、Semaphore是共享锁模式、那么CountDownLatch是什么模式呢?CountDownLatch它是闭锁模式。


什么是闭锁?
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行
例如:
1.确保某个服务在其依赖的其他服务都启动之后才启动
2.等待某个操作的所有参与者都准备就绪才继续执行


2
什么是CountDownLatch


CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。


如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:

一家人要外出自驾游,旅游的家庭成员总共有5位,只有当5位家庭成员全部上车了,爸爸才会开始开车出发对吗?假如爸爸现在通知家庭成员要出发了,其他的3位成员陆陆续续的上车,此时车上已有4位成员了,
还有一位小妹妹在上厕所,这个时候爸爸以及其他3位成员需要等待妹妹上车后,才会开车出发旅游。


3
CountDownLatch的使用场景


在我们平常开发过程中,需要对某些接口进行高平发测试,一般我们会想到通过jmeter性能工具进行压测,但是我们有没有java工具类去帮我实现这种并发测试的API工具呢?当然是有的,就是CountDownLacth工具类,我们可以在代码中模拟高并发测试接口的场景


以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。


4
CountDownLatch源码详解


如何使用CountDownLatch
聊完CountDownLatch的使用场景后,我们来看看基于上面的场景通过CountDownLatch来实现相应的功能
/** * @author sunny * @date 2021/07/06 09:30 * @description */@Slf4jpublic class CountDownLatchTest {    public static void main(String[] args) throws InterruptedException {        test01();//        test02();//        test03();    }    /**     * 模拟高并发场景     *     * @throws InterruptedException     */    public static void test01() throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(1);        for (int i = 0; i < 10; i++) {            new Thread(() -> {                try {                    log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());                    countDownLatch.await();                    log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }).start();        }        TimeUnit.SECONDS.sleep(3);        System.out.println("\n ========================= \n");        countDownLatch.countDown();    }    /**     * 模拟家庭旅游场景     */    public static void test02() {        CountDownLatch countDownLatch = new CountDownLatch(11);        for (int i = 0; i < 10; i++) {            new Thread(() -> {                log.info("{}:已经上车了", Thread.currentThread().getName());                countDownLatch.countDown();            }).start();        }        new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(5);                log.info("{}:五秒后已经上车了", Thread.currentThread().getName());                countDownLatch.countDown();            } catch (InterruptedException e) {                e.printStackTrace();            }        }).start();        try {            TimeUnit.SECONDS.sleep(2);            System.out.println("\n ========================= \n");            countDownLatch.await();            log.info("开车旅游啦");        } catch (InterruptedException e) {            e.printStackTrace();        }    }    /**     * 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发     * 那么修路的过程中     */    public static void test03() throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(10);        for (int i = 1; i <= 9; i++) {            new Thread(() -> {                try {                    log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());                    countDownLatch.await();                    log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }).start();        }        TimeUnit.SECONDS.sleep(3);        log.info("======================准备开始动工======================");        for (int i = 1; i <= 9; i++) {            Thread.sleep(300);            int fi = i;            new Thread(() -> {                countDownLatch.countDown();                log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());            }).start();        }        TimeUnit.SECONDS.sleep(6);        new Thread(() -> {            log.info("{}:线程,已开工", Thread.currentThread().getName());            countDownLatch.countDown();        }).start();        TimeUnit.SECONDS.sleep(8);        for (int i = 0; i < 5; i++) {            new Thread(() -> {                try {                    countDownLatch.await();                    log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }).start();        }    }}

源码分析
CountDownLatch有一个构造方法,传入的参数给state进行初始化,在CountDownLatch中state,我们就可以理解闭锁值,例如上面所说的家庭自驾游案例,当家庭成员全部到位,爸爸才会开车出发旅游对么?
public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");    this.sync = new Sync(count);  // 更新 state 值}


首先从 "countDownLatch.await()" 作为源码入口
public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}


在该方法内有两个if判断,首先判断当前线程是否被中断,如果被中断了则直接抛出异常,而tryAcquireShared()方法则是通用模板方法,不同的子类根据自己的特性实现具体的逻辑
public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}


具体的加锁逻辑由子类自身的特性去具体实现的,在CountDownLatch中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}


接着我们走到CountDownLatch所实现逻辑代码块,该方法很简单,就是一个三元表达式,如果当前线程获取的state为0则代表无需进行等待,否则需要进行入队等待。就如刚刚前面所讲的自驾游场景案例,只有当所有成员全部上车了,才会开车出发,到这里还是很好理解的对吗?哈哈哈,我们接着往下看。
protected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1;}


然后我们回退到上一步,看到acquireSharedInterruptibly()方法,如果state大于0则返回-1,从而会进入到doAcquireSharedInterruptibly()方法,这个方法与Semaphore()逻辑几乎一样,只是我们需要理解概念所一样而已。
public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}


然后我们进入到doAcquireSharedInterruptibly()方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?
private void doAcquireSharedInterruptibly(int arg)    throws InterruptedException {    final Node node = addWaiter(Node.SHARED);  // 构建双向链表 或 入队操作    boolean failed = true;    try {        for (;;) { // 自旋            final Node p = node.predecessor();  //获取当前节点的前驱节点            if (p == head) {                int r = tryAcquireShared(arg);  // 尝试获取令牌                if (r >= 0) {  // 获取令牌成功                    setHeadAndPropagate(node, r);  //传播链表                    p.next = null; // help GC    将前驱节点的引用指向为NULL,待垃圾回收器回收                    failed = false;                    return;  // 获取令牌成功,退出自旋                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())   // 阻塞当前线程                throw new InterruptedException();        }    } finally {        // 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态        if (failed)            cancelAcquire(node);    }}


使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
首先将当前线程封装为node节点,接着获取tail节点,判断当前AQS中是否存在双向链表,如果存在的话,将node前驱节点引用指向tail节点,通过cas将node节点设置为末尾节点,如果设置成功则将tail节点的后驱引用指向node,那么node就顺理成章的成了双向链表的末尾节点了。关于这里我们其实需要思考一个问题,在多线程情况下同时通过cas去设置尾节点,此时只会有一个线程设置成功且返回出去,那接下来的线程该怎么办呢?且不急,带着这个疑问我们进入到enq方法
private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);   // 封装节点    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;  // 获取末尾节点    if (pred != null) {        node.prev = pred;   // 当前节点的前驱引用指向为pred        if (compareAndSetTail(pred, node)) {  // 将当前节点设置为链表末尾节点            pred.next = node;  // 原末尾节点后驱引用指向为当前节点            return node;         }    }    enq(node);    return node;}


基于FIFO入队流程图


通过如下图理解上面这段话,我相信应该是能够明白的



使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
得勒,进来就是一层自旋,注意这里的精华就是自旋,以及上面所提到多线程通过cas设置尾节点失败的解决方案就在此方法。
进入自旋获取链表的末尾节点,如果获取tail为null则证明当前并没有构成双向链表,接着通过cas去设置head,然后将head指向tail,这样双向链表就完成了,如果获取tail不为null,将node前驱引用指向tail节点,然后tail的后驱节点引用指向node节点,然后返回出去。那如果设置失败了怎么办呢?回到上面的问题,问题不大,这方法不是自旋嘛,它会一直自旋到你设置成功为止,才退出自旋。
private Node enq(final Node node) {    for (;;) {        Node t = tail; // 获取末尾节点        if (t == null) { // Must initialize   // 构建双向链表            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}


如果通过cas设置不成功,就一直进行自旋,直到设置成功才退出循环。



接着,回退到doAcquireSharedInterruptibly()方法,通过上面的流程下来,我们就知道node节点现在已经成功入队到双向链表中,接着判断如果当前节点的前驱节点是为头节点此时会尝试获取令牌,如果获取失败则将线程进行阻塞,同理当前节点的前驱节点不是链表的头节点,也会将当前线程进行阻塞。无论如何只要令牌没有了,就得老老实实的在队列中进行呆着,直到下一次的唤醒。
那如果线程为头节点且获取令牌成功了,setHeadAndPropagate()方法又会做些什么事情呢?带着这个疑问,我们进去一探究竟
private void doAcquireSharedInterruptibly(int arg)    throws InterruptedException {    final Node node = addWaiter(Node.SHARED);  // 构建双向链表 或 入队操作    boolean failed = true;    try {        for (;;) { // 自旋            final Node p = node.predecessor();  //获取当前节点的前驱节点            if (p == head) {                int r = tryAcquireShared(arg);  // 尝试获取令牌                if (r >= 0) {  // 获取令牌成功                    setHeadAndPropagate(node, r);  //传播链表                    p.next = null; // help GC    将前驱节点的引用指向为NULL,待垃圾回收器回收                    failed = false;                    return;  // 获取令牌成功,退出自旋                }            }            if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞                parkAndCheckInterrupt())   // 阻塞当前线程                throw new InterruptedException();        }    } finally {        // 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态        if (failed)            cancelAcquire(node);    }}


首先我们看到该方法的入参内容,node:当前获取令牌线程节点,propagate: 值是根据获取state是否等于0判断,如果等0这么为1否则为-1
该方法主要作用在于两点,第一点:将当前节点设置为头节点,第二点:自动唤醒下一个节点
private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below    setHead(node);    /*     * Try to signal next queued node if:     *   Propagation was indicated by caller,     *     or was recorded (as h.waitStatus either before     *     or after setHead) by a previous operation     *     (note: this uses sign-check of waitStatus because     *      PROPAGATE status may transition to SIGNAL.)     * and     *   The next node is waiting in shared mode,     *     or we don't know, because it appears null     *     * The conservatism in both of these checks may cause     * unnecessary wake-ups, but only when there are multiple     * racing acquires/releases, so most need signals now or soon     * anyway.     */    if (propagate > 0 || h == null || h.waitStatus < 0 ||   // 还有令牌可获取 || 头节点状态处于等待状态        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;  // 获取当前下一节点        if (s == null || s.isShared())  // 判断下节点是否为共享节点            doReleaseShared();  // 传播~~ 具体传播什么呢???    }}

稍微可以看下设置头节点方法,也就是出队操作,主要就是将当前线程设置为头节点,然后将当前节点的前驱节点引用指向为null,配合方法外,会将之前的头节点的next节点设置为null,那么之前的头节点也就自然会被垃圾回收器进行
private void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}


基于FIFO出队流程图



又一次来到自旋,首先验证链表中是否还存在多个节点,如果存在且状态为SIGNAL会将head的后驱节点进行唤醒。这里没啥太多好说的,就是一个传播概念,当你有多个节点在阻塞中,当state为0,是不是我的所有阻塞节点都需要被唤醒,然后执行后续的逻辑对么?
private void doReleaseShared() {    /*     * Ensure that a release propagates, even if there are other     * in-progress acquires/releases.  This proceeds in the usual     * way of trying to unparkSuccessor of head if it needs     * signal. But if it does not, status is set to PROPAGATE to     * ensure that upon release, propagation continues.     * Additionally, we must loop in case a new node is added     * while we are doing this. Also, unlike other uses of     * unparkSuccessor, we need to know if CAS to reset status     * fails, if so rechecking.     */    for (;;) {  // 自旋   可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】        Node h = head;          if (h != null && h != tail) {  // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点            int ws = h.waitStatus;   // 获取head的节点状态            if (ws == Node.SIGNAL) {  // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒                    continue;            // loop to recheck cases                unparkSuccessor(h);  // 唤醒线程 去获取令牌            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE   PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;   // 跳出当前循环    }}

unparkSuccessor()方法很简单,在正常流程下它只会通过LockSupport.unpark(),将下一节点进行唤醒
private void unparkSuccessor(Node node) {    // 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现    int ws = node.waitStatus;        // 由于-1会小于0,所以更新改为0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点 Node s = node.next; //正常解锁流程不会走该if判断 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒 if (s != null) LockSupport.unpark(s.thread);}


接下来看看countDown()方法到底也做了些什么流程操作
首先从 "countDownLatch.countDown()" 作为源码入口
sync.releaseShared(1);


我们来看到releaseShared方法,该方法内部有两个核心方法,我们先进入看看tryReleaseShared做了些什么事情
public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {  //通用释放令牌        doReleaseShared();  //唤醒后驱节点        return true;    }    return false;}


我们又看到了自旋,判断当前的state值是否等于0,等于0则代表需要提前的准备的线程都已就绪,主线程也可以执行剩下的业务逻辑啦,那如果不为0怎么办?一直自减,直到减到state为0,然后将链表内的线程全部进行唤醒。也就是会走到我上面所说到的doReleaseShared()方法
protected boolean tryReleaseShared(int releases) {    // Decrement count; signal when transition to zero    for (;;) {        int c = getState();        if (c == 0)            return false;        int nextc = c-1;        if (compareAndSetState(c, nextc))            return nextc == 0;    }}

那么到这CountDownLatch源码分析到此结束了,相信大家伙如果看过我前面两篇文章,再看这篇博文会发现理解起来非常简单的。


JUC并发编程之CountDownLatch源码讲解视频


我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。


如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


扫描二维码关注我
不定期更新技术文章哦



JUC并发编程之Semaphore源码详解

JUC并发编程之ReentrantLock非公平锁源码详解

JUC并发编程之Synchronized关键字详解

JUC并发编程之MESI缓存一致协议详解

JUC并发编程之Volatile关键字详解

JUC并发编程之JMM内存模型详解

深入Hotspot源码与Linux内核理解NIO与Epoll



发现“在看”和“赞”了吗,因为你的点赞,让我元气满满哦
浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报