Java并发之AQS详解
共 28906字,需浏览 58分钟
·
2021-02-26 11:42
靓仔靓女们好,我们又见面了,我是java小杰要加油,现就职于京东,致力于分享java相关知识,包括但不限于并发、多线程、锁、mysql以及京东面试真题
AQS介绍
AQS全称是AbstractQueuedSynchronizer,是一个抽象队列同步器,JUC并发包中的大部分的并发工具类,都是基于AQS实现的,所以理解了AQS就算是四舍五入掌握了JUC了(好一个四舍五入学习法)那么AQS到底有什么神奇之处呢?有什么特点呢?让我们今天就来拔光它,一探究竟!
state:代表被抢占的锁的状态 队列:没有抢到锁的线程会包装成一个node节点存放到一个双向链表中
AQS大概长这样,如图所示:
你说我随便画的,我可不是随便画的啊,我是有bear而来,来看下AQS基本属性的代码
那么这个Node节点又包含什么呢?来吧,展示。
那么我们就可以把这个队列变的更具体一点
怎么突然出来个exclusiveOwnerThread
?还是保存当前获得锁的线程,哪里来的呢
还记得我们AQS一开始继承了一个类吗
这个exclusiveOwnerThread
就是它里面的属性
再次回顾总结一下,AQS属性如下:
state:代表被抢占的锁的状态 exclusiveOwnerThread:当前获得锁的线程 队列:没有抢到锁的线程会包装成一个node节点存放到一个双向链表中 thread: 当前node节点包装的线程 waitStatus:当前节点的状态 pre: 当前节点的前驱节点 next: 当前节点的后继节点 nextWaiter:表示当前节点对锁的模式,独占锁的话就是null,共享锁为Node() Node节点 :
好了,我们对AQS大概是什么东西什么结构长什么样子有了个清楚的认知,下面我们直接上硬菜,从源码角度分析下,AQS加锁,它这个结构到底是怎么变化的呢?
注:以下分析的都是独占模式下的加锁
独占模式 : 锁只允许一个线程获得 NODE.EXCLUSIVE 共享模式 :锁允许多个线程获得 NODE.SHARED
AQS加锁源码——acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
乍一看这是什么啊,没关系,我们可以把它画成流程图方便我们理解,流程图如下
下面我们来一个一个分析,图文并茂,来吧宝贝儿。
AQS加锁源码——tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这是什么情况?怎么直接抛出了异常?其实这是由AQS子类重写的方法,就类似lock锁,由子类定义尝试获取锁的具体逻辑
我们平常使用lock锁时往往如下 (若不想看lock锁怎么实现的可以直接跳转到下一节)
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
//todo
}finally {
lock.unlock();
}
我们看下lock.lock()
源码
public void lock() {
sync.lock();
}
这个sync
又是什么呢,我们来看下lock类的总体属性就好了
所以我们来看下 默认非公平锁的加锁实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//将state状态从0设为1 CAS方式
if (compareAndSetState(0, 1))
//如果设定成功的话,则将当前线程(就是自己)设为占有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//设置失败的话,就当前线程没有抢到锁,然后进行AQS父类的这个方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//调用非公平锁的方法
return nonfairTryAcquire(acquires);
}
}
现在压力又来到了nonfairTryAcquire(acquires)
这里
final boolean nonfairTryAcquire(int acquires) {
//获得当前线程
final Thread current = Thread.currentThread();
//获得当前锁的状态
int c = getState();
//如果锁的状态是0的话,就表明还没有线程获取到这个锁
if (c == 0) {
//进行CAS操作,将锁的状态改为acquires,因为是可重入锁,所以这个数字可能是>0的数字
if (compareAndSetState(0, acquires)) {
//将当前持有锁的线程设为自己
setExclusiveOwnerThread(current);
//返回 获取锁成功
return true;
}
}// 如果当前锁的状态不是0,判断当前获取锁的线程是不是自己,如果是的话
else if (current == getExclusiveOwnerThread()) {
//则重入数加acquires (这里acquires是1) 1->2 3->4 这样
int nextc = c + acquires;
if (nextc < 0) // overflow 异常检测
throw new Error("Maximum lock count exceeded");
//将锁的状态设为当前值
setState(nextc);
//返回获取锁成功
return true;
}
//当前获取锁的线程不是自己,获取锁失败,返回
return false;
}
由此可见,回到刚才的问题,AQS中的tryAcquire
是由子类实现具体逻辑的
AQS加锁源码——addWaiter
如果我们获取锁失败的话,就要把当前线程包装成一个Node节点,那么具体是怎么包装的呢,也需要化妆师经纪人吗?我们来看下源码就知道了addWaiter(Node.EXCLUSIVE), arg)
这就代表添加的是独占模式的节点
private Node addWaiter(Node mode) {
//将当前线程包装成一个Node节点
Node node = new Node(Thread.currentThread(), mode);
// 声明一个pred指针指向尾节点
Node pred = tail;
//尾节点不为空
if (pred != null) {
//将当前节点的前置指针指向pred
node.prev = pred;
//CAS操作将当前节点设为尾节点,tail指向当前节点
if (compareAndSetTail(pred, node)) {
//pred下一节点指针指向当前节点
pred.next = node;
//返回当前节点 (此时当前节点就已经是尾节点)
return node;
}
}
//如果尾节点为空或者CAS操作失败
enq(node);
return node;
}
其中node的构造函数是这样的
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
我们可以通过图解的方法来更直观的来看下addWaiter
做了什么
由图可知,如果曾经尾节点不为空的时候,node节点会加入到队列末尾,那么如果曾经尾节点为空或者CAS失败调用enq(node);
会怎么样呢?
AQS加锁源码——enq
private Node enq(final Node node) {
//死循环,直到有返回值
for (;;) {
//声明一个t的指针指向tail
Node t = tail;
//如果尾巴节点为空
if (t == null) { // Must initialize
//则CAS设置一个节点为头节点(头节点并没有包装线程!)这也是延迟初始化头节点
if (compareAndSetHead(new Node()))
//将尾指针指向头节点
tail = head;
} else { //如果尾节点不为空,则说明这是CAS失败
// 将node节点前驱节点指向t
node.prev = t;
//继续CAS操作将自己设为尾节点
if (compareAndSetTail(t, node)) {
//将t的next指针指向自己 (此时自己真的是尾节点了)
t.next = node;
//返回自己节点的前置节点,队列的倒数第二个
return t;
}
}
}
}
队列中的头节点,是延迟初始化的,加锁时用到的时候才去输出话,并不是一开始就有这个头节点的 头节点并不保存任何线程
end 尾分叉
// 将node节点前驱节点指向t
node.prev = t; 1
//继续CAS操作将自己设为尾节点
if (compareAndSetTail(t, node)) { 2
//将t的next指针指向自己 (此时自己真的是尾节点了)
t.next = node; 3
//返回自己节点的前置节点,队列的倒数第二个
return t;
}
我们注意到,enq
函数有上面三行代码,3是在2执行成功后才会执行的,由于我们这个代码无时无刻都在并发执行,存在一种可能就是
1执行成功,2执行失败(cas并发操作),3没有执行,所以就只有一个线程1,2,3都执行成功,其他线程1执行成功,2,3没有执行成功,出现尾分叉情况,如图所示
这些分叉失败的节点,在以后的循环中他们还会执行1,直总会指向新的尾节点,1,2,3这么执行,早晚会入队
AQS加锁源码——acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 是否有异常发生
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
//开始自旋,要么当前线程尝试去拿到锁,要么抛出异常
for (;;) {
//获得当前节点的上一个节点
final Node p = node.predecessor();
//判断这个节点是不是头节点,如果是头节点则获取锁
if (p == head && tryAcquire(arg)) {
//将此节点设为头节点
setHead(node);
//原来的头节点出队
p.next = null; // help GC
failed = false;
//返回是否中断
return interrupted;
}
// 说明p不是头节点
// 或者
// p是头节点但是获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 中断标志设为true
interrupted = true;
}
} finally {
//如果有异常发生的话
if (failed)
//取消当前线程竞争锁,将当前node节点状态设置为cancel
cancelAcquire(node);
}
}
其中有一行代码是setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null; //将head节点的线程置为空
node.prev = null;
}
为什么要将头节点的线程置为空呢,是因为在 tryAcquire(arg)
中就已经记录了当前获取锁的线程了,在记录就多此一举了,我们看前文中提到的nonfairTryAcquire(acquires)
其中有一段代码
if (compareAndSetState(0, acquires)) {
//将当前持有锁的线程设为自己
setExclusiveOwnerThread(current);
//返回 获取锁成功
return true;
}
可见setExclusiveOwnerThread(current);
就已经记录了获得锁的线程了
我们acquireQueued
返回值是中断标志,true表示中断过,false表示没有中断过,还记得我们一开始吗,回到最初的起点
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果返回了true,代表此线程有中断过,那么调用selfInterrupt();
方法,将当前线程中断一下
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
AQS加锁源码——shouldParkAfterFailedAcquire
程序运行到这里就说明
// 说明p不是头节点
// 或者
// p是头节点但是获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 中断标志设为true
interrupted = true;
}
我们来分析下shouldParkAfterFailedAcquire(p, node)
的源码里面到底做了什么?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前节点的前置节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果是SIGNAL(-1)状态直接返回true,代表此节点可以挂起
//因为前置节点状态为SIGNAL在适当状态 会唤醒后继节点
return true;
if (ws > 0) {
//如果是cancelled
do {
//则从后往前依此跳过cancelled状态的节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//将找到的符合标准的节点的后置节点指向当前节点
pred.next = node;
} else {
//否则将前置节点等待状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
其中的node.prev = pred = pred.prev;
可以看成
pred = pred.prev;
node.prev = pred;
可见一顿操作后,队列中跳过了节点状态为cancelled的节点
AQS加锁源码——parkAndCheckInterrupt
当shouldParkAfterFailedAcquire
返回true时就代表允许当前线程挂起然后就执行 parkAndCheckInterrupt()
这个函数
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程 线程卡在这里不再下执行,直到unpark唤醒
LockSupport.park(this);
return Thread.interrupted();
}
所以当前线程就被挂起啦
AQS加锁源码——cancelAcquire
我们还记得前文中提到acquireQueued
中的一段代码
try {
} finally {
if (failed)
cancelAcquire(node);
}
这是抛出异常时处理节点的代码,下面来看下源代码
private void cancelAcquire(Node node) {
//过滤掉无效节点
if (node == null)
return;
//当前节点线程置为空
node.thread = null;
//获取当前节点的前一个节点
Node pred = node.prev;
//跳过取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//记录过滤后的节点的后置节点
Node predNext = pred.next;
//将当前节点状态改为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是tail尾节点 则将从后往前找到第一个非取消状态的节点设为tail尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//如果设置成功,则tail节点后面的节点会被设置为null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//如果当前节点不是首节点的后置节点
if (pred != head && //并且
//如果前置节点的状态是SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL || //或者
//状态小于0 并且设置状态为SIGNAL成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
//并且前置节点线程不为null时
pred.thread != null) {
//记录下当前节点的后置节点
Node next = node.next;
//如果后置节点不为空 并且后置节点的状态小于0
if (next != null && next.waitStatus <= 0)
//把当前节点的前驱节点的后继指针指向当前节点的后继节点
compareAndSetNext(pred, predNext, next);
} else {
//唤醒当前节点的下一个节点
unparkSuccessor(node);
}
//将当前节点下一节点指向自己
node.next = node; // help GC
}
}
看起来太复杂了,不过没关系,我们可以拆开看,其中有这一段代码
//当前节点线程置为空
node.thread = null;
//获取当前节点的前一个节点
Node pred = node.prev;
//跳过取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//记录过滤后的节点的后置节点
Node predNext = pred.next;
//将当前节点状态改为CANCELLED
node.waitStatus = Node.CANCELLED;
如图所示
通过while循环从后往前找到signal状态的节点,跳过中间cancelled状态的节点,同时将当前节点状态改为CANCELLED
我们可以把这复杂的判断条件转换成图来直观的看一下
当前节点是尾节点时,队列变成这样
当前节点是head后继节点
当前节点不是尾节点也不是头节点的后继节点(队列中的某个普通节点)
总结
太不容易了家人们,终于到了这里,我们再来总结一下整体的流程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.基于AQS实现的子类去实现tryAcquire
尝试获取锁
2.如果获取锁失败,则把当前节点通过addWaiter
方法包装成node
节点插入队列
如果尾节点为空或者CAS操作失败则调用 enq
方法保证成功插入到队列,若节点为空则初始化头节点
3.acquireQueued
方法,入队后的节点继续获取锁(此节点的前置节点是头节点)或者挂起
shouldParkAfterFailedAcquire
判断节点是否应该挂起如果当前节点的前置节点是signal状态,则返回true,可以挂起 如果当前节点的前置节点是cancelled,则队列会从当前节点的前一个节点开始从后向前遍历跳过cacelled状态的节点,将当前节点和非cacelled状态的节点连接起来,返回false,不可以挂起 否则将前置节点等待状态设置为SIGNAL,返回false,不可以挂起 parkAndCheckInterrupt
挂起当前线程cancelAcquire
将当前节点状态改为cancelld
4.selfInterrupt();
设置中断标志,将中断补上