Java并发之AQS详解

java宝典

共 28906字,需浏览 58分钟

 ·

2021-02-26 11:42

靓仔靓女们好,我们又见面了,我是java小杰要加油,现就职于京东,致力于分享java相关知识,包括但不限于并发、多线程、锁、mysql以及京东面试真题

AQS介绍

  • AQS全称是AbstractQueuedSynchronizer,是一个抽象队列同步器,JUC并发包中的大部分的并发工具类,都是基于AQS实现的,所以理解了AQS就算是四舍五入掌握了JUC了(好一个四舍五入学习法)那么AQS到底有什么神奇之处呢?有什么特点呢?让我们今天就来拔光它,一探究竟!

    • state:代表被抢占的锁的状态
    • 队列:没有抢到锁的线程会包装成一个node节点存放到一个双向链表中

AQS大概长这样,如图所示:

你说我随便画的,我可不是随便画的啊,我是有bear而来,来看下AQS基本属性的代码

那么这个Node节点又包含什么呢?来吧,展示。

那么我们就可以把这个队列变的更具体一点

怎么突然出来个exclusiveOwnerThread?还是保存当前获得锁的线程,哪里来的呢 还记得我们AQS一开始继承了一个类吗

这个exclusiveOwnerThread就是它里面的属性

再次回顾总结一下,AQS属性如下:

  1. state:代表被抢占的锁的状态
  2. exclusiveOwnerThread:当前获得锁的线程
  3. 队列:没有抢到锁的线程会包装成一个node节点存放到一个双向链表中
    • thread:  当前node节点包装的线程
    • waitStatus:当前节点的状态
    • pre: 当前节点的前驱节点
    • next: 当前节点的后继节点
    • nextWaiter:表示当前节点对锁的模式,独占锁的话就是null,共享锁为Node()
    • Node节点 :

好了,我们对AQS大概是什么东西什么结构长什么样子有了个清楚的认知,下面我们直接上硬菜,从源码角度分析下,AQS加锁,它这个结构到底是怎么变化的呢?

注:以下分析的都是独占模式下的加锁

  • 独占模式 : 锁只允许一个线程获得 NODE.EXCLUSIVE
  • 共享模式 :锁允许多个线程获得  NODE.SHARED

AQS加锁源码——acquire

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

乍一看这是什么啊,没关系,我们可以把它画成流程图方便我们理解,流程图如下

下面我们来一个一个分析,图文并茂,来吧宝贝儿。



AQS加锁源码——tryAcquire

 protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这是什么情况?怎么直接抛出了异常?其实这是由AQS子类重写的方法,就类似lock锁,由子类定义尝试获取锁的具体逻辑

我们平常使用lock锁时往往如下 (若不想看lock锁怎么实现的可以直接跳转到下一节

ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try{
           //todo
        }finally {
            lock.unlock();
        }

我们看下lock.lock()源码

    public void lock() {
        sync.lock();
    }

这个sync又是什么呢,我们来看下lock类的总体属性就好了

所以我们来看下 默认非公平锁的加锁实现

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        
        final void lock() {
            //将state状态从0设为1 CAS方式
            if (compareAndSetState(01))
                //如果设定成功的话,则将当前线程(就是自己)设为占有锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //设置失败的话,就当前线程没有抢到锁,然后进行AQS父类的这个方法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            //调用非公平锁的方法
            return nonfairTryAcquire(acquires);
        }
    }

现在压力又来到了nonfairTryAcquire(acquires)这里

 final boolean nonfairTryAcquire(int acquires) {
            //获得当前线程
            final Thread current = Thread.currentThread();
            //获得当前锁的状态
            int c = getState();
            //如果锁的状态是0的话,就表明还没有线程获取到这个锁
            if (c == 0) {
                //进行CAS操作,将锁的状态改为acquires,因为是可重入锁,所以这个数字可能是>0的数字
                if (compareAndSetState(0, acquires)) {
                    //将当前持有锁的线程设为自己
                    setExclusiveOwnerThread(current);
                    //返回 获取锁成功
                    return true;
                }
            }// 如果当前锁的状态不是0,判断当前获取锁的线程是不是自己,如果是的话
            else if (current == getExclusiveOwnerThread()) {
                //则重入数加acquires  (这里acquires是1)  1->2  3->4 这样
                int nextc = c + acquires;
                if (nextc < 0// overflow   异常检测
                    throw new Error("Maximum lock count exceeded");
                //将锁的状态设为当前值
                setState(nextc);
                //返回获取锁成功
                return true;
            }
            //当前获取锁的线程不是自己,获取锁失败,返回
            return false;
        }

由此可见,回到刚才的问题,AQS中的tryAcquire是由子类实现具体逻辑的

AQS加锁源码——addWaiter

如果我们获取锁失败的话,就要把当前线程包装成一个Node节点,那么具体是怎么包装的呢,也需要化妆师经纪人吗?我们来看下源码就知道了addWaiter(Node.EXCLUSIVE), arg) 这就代表添加的是独占模式的节点

 private Node addWaiter(Node mode) {
        //将当前线程包装成一个Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // 声明一个pred指针指向尾节点
        Node pred = tail;
        //尾节点不为空
        if (pred != null) {
            //将当前节点的前置指针指向pred
            node.prev = pred;
            //CAS操作将当前节点设为尾节点,tail指向当前节点
            if (compareAndSetTail(pred, node)) {
                //pred下一节点指针指向当前节点
                pred.next = node;
                //返回当前节点 (此时当前节点就已经是尾节点)
                return node;
            }
        }
        //如果尾节点为空或者CAS操作失败
        enq(node);
        return node;
    }

其中node的构造函数是这样的

 Node(Thread thread, Node mode) {     // Used by addWaiter
      this.nextWaiter = mode;
      this.thread = thread;
  }

我们可以通过图解的方法来更直观的来看下addWaiter做了什么

由图可知,如果曾经尾节点不为空的时候,node节点会加入到队列末尾,那么如果曾经尾节点为空或者CAS失败调用enq(node);会怎么样呢?

AQS加锁源码——enq

 private Node enq(final Node node) {
        //死循环,直到有返回值
        for (;;) {
            //声明一个t的指针指向tail
            Node t = tail;
            //如果尾巴节点为空
            if (t == null) { // Must initialize
                //则CAS设置一个节点为头节点(头节点并没有包装线程!)这也是延迟初始化头节点
                if (compareAndSetHead(new Node()))
                    //将尾指针指向头节点
                    tail = head;
            } else {  //如果尾节点不为空,则说明这是CAS失败
              // 将node节点前驱节点指向t
                node.prev = t;
                //继续CAS操作将自己设为尾节点
                if (compareAndSetTail(t, node)) {
                    //将t的next指针指向自己 (此时自己真的是尾节点了)
                    t.next = node;
                    //返回自己节点的前置节点,队列的倒数第二个
                    return t;
                }
            }
        }
    }
  • 队列中的头节点,是延迟初始化的,加锁时用到的时候才去输出话,并不是一开始就有这个头节点的
  • 头节点并不保存任何线程

end 尾分叉

     // 将node节点前驱节点指向t              
      node.prev = t;                    1
      //继续CAS操作将自己设为尾节点
      if (compareAndSetTail(t, node)) { 2
          //将t的next指针指向自己 (此时自己真的是尾节点了)
          t.next = node;                3
          //返回自己节点的前置节点,队列的倒数第二个
          return t;
      }

我们注意到,enq函数有上面三行代码,3是在2执行成功后才会执行的,由于我们这个代码无时无刻都在并发执行,存在一种可能就是

1执行成功,2执行失败(cas并发操作),3没有执行,所以就只有一个线程1,2,3都执行成功,其他线程1执行成功,2,3没有执行成功,出现尾分叉情况,如图所示

这些分叉失败的节点,在以后的循环中他们还会执行1,直总会指向新的尾节点,1,2,3这么执行,早晚会入队

AQS加锁源码——acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        // 是否有异常发生
        boolean failed = true;
        try {
            //中断标志
            boolean interrupted = false;
            //开始自旋,要么当前线程尝试去拿到锁,要么抛出异常
            for (;;) {
                //获得当前节点的上一个节点
                final Node p = node.predecessor();
                //判断这个节点是不是头节点,如果是头节点则获取锁
                if (p == head && tryAcquire(arg)) {
                    //将此节点设为头节点
                    setHead(node);
                    //原来的头节点出队
                    p.next = null// help GC
                    failed = false;
                    //返回是否中断
                    return interrupted;
                }
                // 说明p不是头节点
                // 或者
                // p是头节点但是获取锁失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 中断标志设为true
                    interrupted = true;
            }
        } finally {
           //如果有异常发生的话
            if (failed)
            //取消当前线程竞争锁,将当前node节点状态设置为cancel
                cancelAcquire(node);
        }
    }

其中有一行代码是setHead(node);

 private void setHead(Node node) {
        head = node;
        node.thread = null;  //将head节点的线程置为空
        node.prev = null;
    }
  • 为什么要将头节点的线程置为空呢,是因为在 tryAcquire(arg)中就已经记录了当前获取锁的线程了,在记录就多此一举了,我们看前文中提到的nonfairTryAcquire(acquires)其中有一段代码
   if (compareAndSetState(0, acquires)) {
          //将当前持有锁的线程设为自己   
          setExclusiveOwnerThread(current);  
          //返回 获取锁成功
          return true;
      }

可见setExclusiveOwnerThread(current);就已经记录了获得锁的线程了

我们acquireQueued返回值是中断标志,true表示中断过,false表示没有中断过,还记得我们一开始吗,回到最初的起点

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如果返回了true,代表此线程有中断过,那么调用selfInterrupt();方法,将当前线程中断一下

 static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

AQS加锁源码——shouldParkAfterFailedAcquire

程序运行到这里就说明

 // 说明p不是头节点
// 或者
// p是头节点但是获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    // 中断标志设为true
    interrupted = true;
}

我们来分析下shouldParkAfterFailedAcquire(p, node)的源码里面到底做了什么?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前节点的前置节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           //如果是SIGNAL(-1)状态直接返回true,代表此节点可以挂起
            //因为前置节点状态为SIGNAL在适当状态 会唤醒后继节点
            return true;
        if (ws > 0) {
            //如果是cancelled
            do {
                //则从后往前依此跳过cancelled状态的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //将找到的符合标准的节点的后置节点指向当前节点
            pred.next = node;
        } else {
            //否则将前置节点等待状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

其中的node.prev = pred = pred.prev;可以看成

pred = pred.prev;

node.prev = pred;

可见一顿操作后,队列中跳过了节点状态为cancelled的节点

AQS加锁源码——parkAndCheckInterrupt

shouldParkAfterFailedAcquire返回true时就代表允许当前线程挂起然后就执行 parkAndCheckInterrupt()这个函数

 private final boolean parkAndCheckInterrupt() {
        // 挂起当前线程   线程卡在这里不再下执行,直到unpark唤醒
        LockSupport.park(this);
        return Thread.interrupted();
    }

所以当前线程就被挂起啦

AQS加锁源码——cancelAcquire

我们还记得前文中提到acquireQueued中的一段代码

  try {
          
        } finally {
            if (failed)
                cancelAcquire(node);
        }

这是抛出异常时处理节点的代码,下面来看下源代码

private void cancelAcquire(Node node) {
        //过滤掉无效节点
        if (node == null)
            return;
        //当前节点线程置为空
        node.thread = null;
        //获取当前节点的前一个节点
        Node pred = node.prev;
        //跳过取消的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //记录过滤后的节点的后置节点
        Node predNext = pred.next;
        //将当前节点状态改为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 如果当前节点是tail尾节点 则将从后往前找到第一个非取消状态的节点设为tail尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            //如果设置成功,则tail节点后面的节点会被设置为null
            compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            //如果当前节点不是首节点的后置节点
            if (pred != head &&  //并且
                    //如果前置节点的状态是SIGNAL
                ((ws = pred.waitStatus) == Node.SIGNAL || //或者
                        //状态小于0 并且设置状态为SIGNAL成功
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    //并且前置节点线程不为null时
                pred.thread != null) {
                //记录下当前节点的后置节点
                Node next = node.next;
                //如果后置节点不为空 并且后置节点的状态小于0
                if (next != null && next.waitStatus <= 0)
                    //把当前节点的前驱节点的后继指针指向当前节点的后继节点
                    compareAndSetNext(pred, predNext, next);
            } else {
                //唤醒当前节点的下一个节点
                unparkSuccessor(node);
            }
            //将当前节点下一节点指向自己
            node.next = node; // help GC
        }
    }

看起来太复杂了,不过没关系,我们可以拆开看,其中有这一段代码

       //当前节点线程置为空
        node.thread = null;
        //获取当前节点的前一个节点
        Node pred = node.prev;
        //跳过取消的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //记录过滤后的节点的后置节点
        Node predNext = pred.next;
        //将当前节点状态改为CANCELLED
        node.waitStatus = Node.CANCELLED;

如图所示

通过while循环从后往前找到signal状态的节点,跳过中间cancelled状态的节点,同时将当前节点状态改为CANCELLED

我们可以把这复杂的判断条件转换成图来直观的看一下

  • 当前节点是尾节点时,队列变成这样
  • 当前节点是head后继节点
  • 当前节点不是尾节点也不是头节点的后继节点(队列中的某个普通节点)

总结

太不容易了家人们,终于到了这里,我们再来总结一下整体的流程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1.基于AQS实现的子类去实现tryAcquire尝试获取锁

2.如果获取锁失败,则把当前节点通过addWaiter方法包装成node节点插入队列

  • 如果尾节点为空或者CAS操作失败则调用enq方法保证成功插入到队列,若节点为空则初始化头节点

3.acquireQueued方法,入队后的节点继续获取锁(此节点的前置节点是头节点)或者挂起

  • shouldParkAfterFailedAcquire判断节点是否应该挂起

    • 如果当前节点的前置节点是signal状态,则返回true,可以挂起
    • 如果当前节点的前置节点是cancelled,则队列会从当前节点的前一个节点开始从后向前遍历跳过cacelled状态的节点,将当前节点和非cacelled状态的节点连接起来,返回false,不可以挂起
    • 否则将前置节点等待状态设置为SIGNAL,返回false,不可以挂起
  • parkAndCheckInterrupt挂起当前线程

  • cancelAcquire将当前节点状态改为cancelld

4.selfInterrupt(); 设置中断标志,将中断补上 

往期精彩推荐




浏览 33
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报