JUC并发编程之ReentrantLock非公平锁源码详解

黎明大大

共 17827字,需浏览 36分钟

 ·

2021-07-03 16:43


点击上方蓝字 关注我吧



1
前言

大家伙好,已经时隔将近一个月没更新博客了,今天分享的内容点为ReentrantLock非公平锁源码,为什么分享这个呢?因为在并发情况下,难免会造成数据安全性问题,而ReentrantLock恰恰也是解决了并发情况下同时一起操作数据的安全性问题,同样该知识点也是基础中的重点啦~


2
什么是ReentrantLock


ReentLock是基于AQS框架进行实现,同样也是解决多线程并发手段之一,它的功能类似于Synchronized是一种互斥锁,可以保证线程安全。不同的在于它比Synchronized多了很多强大特性,例如支持公平锁、共享锁、手动加锁和释放锁以及允许中断操作


3
AQS是什么


AQS它的全称是 "AbstractQueuedSynchronizer" 它采用了模板方法设计模式在内部定义获取和释放同步状态的模板方法,并留下钩子函数提供给子类进行扩展,由子类来具体控制获取与释放锁的细节点,从而满足自身特性的需求。除此之外AQS中还提供好了线程阻塞与唤醒的方法,以及在AQS中还有一个特别重要的细节点,就是 "双向链表",当多个线程去同时去抢占锁的时候,会就未抢到锁的节点,封装成Node节点,进行等待下一波唤醒获取锁。


下图就是基于AQS所实现的一些比较强大的功能类,例如公平与非公平锁,信号量,线程池


在AQS抽象类中,有几个特别重要的属性,首先说下AQS父类AbstractOwnableSynchronizer的成员属性
// 标识拿到锁的是哪个线程private transient Thread exclusiveOwnerThread;


接着说下AQS本身成员属性
// 标识头节点private transient volatile Node head;// 标识尾节点private transient volatile Node tail;// 同步状态,为0时,说明可以抢锁private volatile int state;


最后是AQS内部类Node的几个重要属性
// 记录Node状态volatile int waitStatus;// 标识Node是哪个线程所持有volatile Node prev;// 前驱节点(当前Node的上一个是谁)volatile Node next;// 后继节点(当前Node的个一个是谁)volatile Thread thread;



4
ReentrantLock源码详细分析


如何使用ReentLock


下面是一个简单的lock案例,使用lock需要注意点,我们获取锁需要被try包裹,释放锁一定要在finally里面,为什么需要这么做呢?是因为如果多个线程同时进来抢锁,当其中一个线程抢到锁后,执行完业务逻辑还没来得及释放锁就发生了异常,就会导致锁没有被释放,从而该业务逻辑一直被卡住。所以为了避免这一现象的发生,我们的释放锁一定要写在finally里面啦~

public class ReentrantLockTest {    // 初始化Lock类    static final Lock lock = new ReentrantLock();    static int sum = 0;    public static void main(String[] args) throws InterruptedException {        for (int i = 0; i < 3; i++) {            Thread thread = new Thread(new Runnable() {                @Override                public void run() {                    try {                        // 获取锁                        lock.lock();                        for (int i1 = 0; i1 < 10000; i1++) {                            sum++;                        }                    } catch (Exception e) {                        e.printStackTrace();                    } finally {                        //释放锁                        lock.unlock();                    }                }            });            thread.start();        }        TimeUnit.SECONDS.sleep(1);        System.out.println(sum);    }}


源码分析
了解完如何使用lock锁后,开始进入我们的正式源码分析流程


在ReentLock里面,其中有两个构造方法值得我们注意

第一个是无参构造方法,它是非公平锁的一种实现方式,也是我们使用Lock锁默认的一种方式
第二个是有参构造方法,根据传入的布尔值,决定是否为公平锁还是非公平锁。


该文章所讲的是非公平锁,所以这里只放非公平锁的类结构图


非公平锁加锁流程
加锁流程从 “ lock.lock(); ” 开始
public void lock() {    sync.lock();}


加锁流程真正意义上的入口

首先尝试以最快的方式获取锁,当多个线程同时进来了,只会有其中一个线程以CAS的方式将state的值更新为1,前提条件是只有当state的值为0的时候才能更新成功,同时会记录获取锁的线程,若获取锁失败,则执行acquire方法

static final class NonfairSync extends Sync {    private static final long serialVersionUID = 7316153563782823691L;    /**     * Performs lock.  Try immediate barge, backing up to normal     * acquire on failure.     */    final void lock() {        //以cas的方式将AQS中的state属性值从0更新为1        if (compareAndSetState(0, 1))            //如果更新成功,则将当前线程设置为持有锁的线程            setExclusiveOwnerThread(Thread.currentThread());        else            //获取锁失败,则进入该方法            acquire(1);    }    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}


来看看CAS是如何设置值的,它里面有四个参数,第一个参数:需要修改的对象,第二个参数:value变量的内存偏移地址,第三个参数:期望内存值,第四个参数:需要修改的值
流程概述: 首先AQS中默认的state值为0,当一个线程通过CAS设置值,会先找到AQS的对象,然后通过state的偏移量获取内存中的值,接着与我们传进来的期望内存值会和内存地址的值进行比较,如果一致话,则进行修改成功,否则修改失败。
protected final boolean compareAndSetState(int expect, int update) {    // See below for intrinsics setup to support this    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}


如果线程通过cas设置值失败(获取锁失败),则进入acquire方法,该方法的主要逻辑都在if判断中,里面有几个重要的方法,tryAcquire()、acquireQueued()、addWaiter(),加锁逻辑几乎都封装在这三个方法中,清楚了三个方法的逻辑,加锁的流程也差不多该清楚了哈
public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}


通用获取锁tryAcquire方法
为什么说是通用获取方法呢?因为在AQS中该方法是一个钩子方法,也就是我们前面所说的该方法是个模板方法,具体的加锁逻辑由子类自身的特性去具体实现的


在ReentrantLock中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。

protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}


找到关键点 -> 非公平类加锁入口


如下则是非公平锁的实现方式,在其底层调用了nonfairTryAcquire()方法

protected final boolean tryAcquire(int acquires) {    return nonfairTryAcquire(acquires);}


进入nonfairTryAcquire()方法后,内部代码精髓在于ReentrantLock的重入锁概念就是在该方法内实现的 ,用白话文解释重入锁的概念,在要进入家门前,需要通过人脸识别打开外门,识别成功后进入到外门内,但是家里还有内门需要打开,同样需要通过人眼识别打开内门,才能成功进入到自己温馨家里。不知道这段话是否理解,简单理解,两扇门的打开前提,必须是同一个人才能进入,否则将会被拒绝在外。
那么通过程序来理解的话,"Thread-1"线程在前面通过lock.lock()获取了锁,然后该线程在调用了其他方法内部又有一个lock.lock()方法,此时就有了嵌套锁,在程序里面,假如是同一个线程获取了两次锁,那么就会将state值进行自增+1,来代表该线程获取了几次锁。如果获取锁的线程与持有锁的线程不一致,那么就重入锁失败。
final boolean nonfairTryAcquire(int acquires) {    //记录当前线程    final Thread current = Thread.currentThread();    //获取当前state的值    int c = getState();    //如果state的值为0,则证明没有线程抢占锁    if (c == 0) {        // 通过cas进行加锁操作        if (compareAndSetState(0, acquires)) {            // 记录加锁线程            setExclusiveOwnerThread(current);            return true;        }    }    // 如果锁已经被抢占,且当前线程就是持有锁的线程,则说明该锁被重入    else if (current == getExclusiveOwnerThread()) {        //计算要更新后的state值        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        //给state设置值,该方式为非同步        setState(nextc);        //重入锁获取成功        return true;    }    //获取锁失败    return false;}


退回到上层的acquire()方法
如果tryAcquire()返回false,则说明线程获取锁失败,然后取反操作值为true,那么就可以进入下一个流程,然后执行addWaiter(Node.EXCLUSIVE), arg)方法,该方法是构建双向链表的关键点,同样也展现出线程获取锁失败后如何安全的加入到同步等待队列中的。先提前说明一下Node.EXCLUSIVE的值是为NULL, 让我们一起来看看该方法
public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}


线程获取锁失败,加入到同步队列
第一步骤:将当前线程封装为一个Node节点
第二步骤:获取链表末尾节点,如果当前AQS还没有生成双向链表的话,那么就会通过enq进行构建双向链表
private Node addWaiter(Node mode) {    //创建一个节点,将线程实例封装到Node节点内部,当前mode这里是为null的    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure     //获取末尾节点    Node pred = tail;    //如果末尾节点不为空,则证明前面已经有线程排队啦    if (pred != null) {        //将末尾节点的引用交给当前线程的前节点        node.prev = pred;        // 通过cas,将当前线程设置为链表的尾节点        if (compareAndSetTail(pred, node)) {            // 将前尾节点的next引用,指向当前节点,那么当前节点就是链表尾节点了            pred.next = node;            return node;        }    }    //构建链表核心方法    enq(node);    return node;}


进入到enq方法,来看看它是如何进行构建双向链表的。
当线程第一次进来,拿到当前链表尾节点,如果尾节点为null的话,说明当前链表还没有构成,则通过cas添加一个空的头节点,然后再将头节点的引用交给尾节点,那么此时意味着头尾是处于在同一个内存地址中,此时可能有小伙伴会有疑惑,当其中一个节点发生变化,那么另外一个节点也会发生变化,岂不是会发生很严重的后果?看到这先别着急。我们接着往下。
我们看到代码,当第一次循环如果没有尾节点就构造一个头尾节点出来,然后又继续执行第二次循环,第二次循环的时候则会将当前线程的prev引用指向为初始化尾节点,接着又通过CAS将当前线程设置为尾节点,然后将初始化尾节点的next引用指向为当前线程节点,这样是不是双向链表就构成了呢
private Node enq(final Node node) {        //自旋        for (;;) {            //获取当前双向链表的末尾节点            Node t = tail;            //如果当前还没有双向链表,则进行构建链表            if (t == null) { // Must initialize                //初始化一个head节点,空的node节点                if (compareAndSetHead(new Node()))                    //将head节点赋值给tail节点,这样的头尾节点都已生成好了                    tail = head;            } else {                //自旋第二次会进来                //头节点设置为 空节点                node.prev = t;                //通过cas,将当前节点设置为末尾节点                if (compareAndSetTail(t, node)) {                    //设置尾节点引用                    t.next = node;                    return t;                }            }        }    }


方便大家理解,我放上一张图


那么到此获取锁失败的线程,通过构建链表,然后假如到同步队列的逻辑到此就结束了,但是线程加入同步队列后会做什么我们并不清楚,这部分我们并不清楚,所以我们接着退回到acquire方法来,我们进入到acquireQueued方法内部,看看它里面做了些啥逻辑

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}


线程加入到同步队列后会做什么呢
acquireQueued方法的逻辑主要都在for循环里面,这是一个死循环,主要由两个if判断构成。


第一个if判断中,首先会判断当前线程的前驱节点是否为头节点,如果是则尝试获取锁,获取锁成功则将当前线程节点设置为头节点,为什么必须是前驱节点才尝试去获取锁,因为正常情况下,头节点才是持有锁的线程,头节点线程释放掉锁后,会唤醒后驱节点的线程,这个时候被唤醒后才会进行获取锁。举个很形象的例子,就比如火车站窗口排队买火车票,此时有人正在购买火车票,而排在第二个人可以知道第一个人是否购买完成,如果买完了我就可以去买票了啊,如果没买完,那么第二个人后面的人就只能老老实实的进行排队购票啊。


那么第二个if判断就是如果前面有人在排队购票,那么我就只能老老实实排在后面进行等待呗。再举个很形象的例子哈,有一款很好玩的游戏,因为这款游戏还没发售需要进行预约,此时我并不知道它什么时候才会发售,所以就先预约排队嘛,当要发售了,就会有工作人员去通知你,你预约的游戏可以已经发售了,你可以进行购买了,这就是排队与通知的。

 //Node: 为线程1的节点   arg:1final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        //只有获取到锁的线程才会跳出循环        for (;;) {            //获取当前线程节点的 prev 节点            final Node p = node.predecessor();             // 第二个线程 第一个条件为true , 第二个条件为false,因为第二个条件是 抢锁逻辑            if (p == head && tryAcquire(arg)) {                setHead(node); //将当前节点更新为头节点                p.next = null; // help GC                failed = false;                return interrupted; //正常情况下死循环的唯一出口            }             // 将上一个节点设置为独占锁-1,条件返回fasle,第二次自旋进来            if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞                //阻塞当前线程                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}


我们来看看shouldParkAfterFailedAcquire方法是如何进行判断当前线程是需要进行排队的。


我们可以前驱节点pred的状态会进行不同处理

1.pred状态为SIGNAL,则返回true,表示要阻塞当前线程
2.pred状态为CANCELLED,则一直往队列头部回溯直到找到一个状态不为CANCELLED的结点,将当前节点node挂在这个结点的后面
3.pred的状态为初始化状态,此时通过compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法将pred的状态改为SIGNAL。


其实这个方法主要作用就是确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟只有确保能够被唤醒,当前线程才能放心的阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL) //状态为SIGNAL        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {   //状态为CANCELLED        /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {   //状态为初始化状态(ReentrentLock语境下)        /*         * waitStatus must be 0 or PROPAGATE.  Indicate that we         * need a signal, but don't park yet.  Caller will need to         * retry to make sure it cannot acquire before parking.         */        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}


最后我们来看看parkAndCheckInterrupt()阻塞方法
如果shouldParkAfterFailedAcquire返回true,则表示应该阻塞当前线程,执行parkAndCheckInterrupt方法,这个方法比较简单,底层调用了LockSupport来阻塞当前线程
private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}


非公平锁释放流程
解锁入口
释放锁的源码比加锁相对容易很多,我们来看看它的入口
public void unlock() {    sync.release(1);}


我们来看到release方法,该方法内部有两个if判断,我门先进入看看tryRelease做了些什么事情
public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}


首先获取当前state的状态值,如果当前释放锁的线程与持有锁的线程不一致,则抛出异常,如果一致则会判断state是否为0,如果不为0则不会进行是否锁,因为可能为重入锁,需要被释放多次,如果state为0,则将当前持有锁的线程设置为null,然后再将state设回给主内存中
protected final boolean tryRelease(int releases) {    int c = getState() - releases; //计算待更新的state值    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    if (c == 0) {  //待更新的state值为0,说明持有锁的线程未重入,一旦释放锁其他线程将能获取        free = true;        setExclusiveOwnerThread(null);  //清除锁的持有线程标记    }    setState(c);  //更新state值    return free;}


接着我们在退回到上一步release方法,当我们执行完tryRelease方法,如果返回true,则获取当前链表的头节点,接着有两个条件判断:
h != null :为了防止链表为空,就是没有任何线程处于在队列中,则没有线程可唤醒
h.waitStatus != 0:防止队列中虽说有线程存在,但是该线程还未被阻塞住,前面就有说到,线程在阻塞之前会将自己的前一个节点的状态设置为-1,否则不会进行阻塞自己
当两个条件都成立,则进入unparkSuccessor(),唤醒后驱节点线程
public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}


然后我们来看看最终该方法它是如何唤醒后驱节点的。
private void unparkSuccessor(Node node) {    // 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现    int ws = node.waitStatus;     // 由于-1会小于0,所以更新改为0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 获取第一个正常排队的节点    Node s = node.next;     //正常解锁流程不会走该if判断    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }     // 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒    if (s != null)        LockSupport.unpark(s.thread);}


如果线程被唤醒了,则会接着这个方法继续执行下去
private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}


那么到此ReentranLock非公平锁的加锁与释放锁的流程,到此就结束啦

ReentrantLock视频分享,结合博文观看效果更佳~


我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。


如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


扫描二维码关注我
不定期更新技术文章哦



JUC并发编程之Synchronized关键字详解

JUC并发编程之MESI缓存一致协议详解

JUC并发编程之Volatile关键字详解

JUC并发编程之JMM内存模型详解

深入Hotspot源码与Linux内核理解NIO与Epoll



发现“在看”和“赞”了吗,因为你的点赞,让我元气满满哦
浏览 26
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报