老板让只懂Java基本语法的我,基于AQS实现一个锁
10 点整,我到了公司,又成为全组最后一个到的员工。
正准备刷刷手机摸摸鱼,看见老板神秘兮兮地走了过来。
老板:闪客呀,你写个工具,基于 AQS 实现一个锁,给咱们组其他开发用
我:哦好的
老板:你多久能搞好?
我:就是一个工具类是吧,嗯今天下午就给你吧
老板:嗯,那你抓紧时间搞吧,大家都等着用呢
我:哦好的
先写个框架
关于锁,我还算有一个模糊的认识的,要让使用者可以获取锁、释放锁,来实现多线程访问时的安全性。于是我赶紧先把一个框架写了出来。
// 给帅气老板用的锁
public class FlashLock {
// 释放锁
public void lock() {}
// 释放锁
public void unlock() {}
}
工具类已经完成一半了,一想到全组的开发们下午就会这样用到我的工具,我不禁笑出了声音。
FlashLock flashLock = new FlashLock();
public void doSomeThing() {
// 获取锁,表示同一时间只允许一个线程执行这个方法
flashLock.lock();
try {
...
} finally {
// 优雅地在 finally 里释放锁
flashLock.unlock();
}
}
随着同事们投来异样的眼光,我回过神来。继续想,我怎么在这俩方法里实现这种锁的效果呢?脑子一片空白呀,诶不过老板刚刚说要基于 AQS,那肯定这个东西可以给我提供一些方便吧,于是我在百度百科搜了一下什么是 AQS
百度百科尚未收录词条 “AQS”
这老板水平也太次了,给我推荐个百科上都搜不到的东西... 只能搜搜百度了
额!这看起来还是个 Java 面试的重点呢!真是错怪老板了。
我点了其中一篇,了解到 AQS 的全称叫 AbstractQueuedSynchronizer(抽象的队列式同步器),是一个 JDK 源码中的一个类。
嗨,搞了半天只是个类而已嘛,对我这种源码在手天下我有的神级码农,还看什么文章呀,我迅速打开了 JDK1.8 源码,找到了这个类。
我的天,一共 2316 行!我赶紧把所有注释都去掉,发现还有 914 行。
由于下午就要交稿,我打消了不看注释硬啃源码的念头,开始从头看起了注释...
2:使用 AQS 实现最简单的锁
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released.
第一句话说这是个框架,之后说这个类是基于一个原子变量,这说的都是原理我先不管。
后面又说子类(Subclasses)必须实现一些改变状态(change this state)和获取释放锁(acquired or released)的方法。
哦!看来我需要用一个子类继承它,然后实现它指定的一些方法,其他的事情这个父类都会帮我做好的。敏锐的我马上察觉到,这用的模板方法这种设计模式,这是我最喜欢的设计模式了,因为只需要读懂需要让子类实现的模板方法的含义,即可以很好地使用这个类的强大功能。
于是我赶紧去找,有哪些这样的模板方法,需要子类去实现,果然在注释中发现了这样一段话。
* To use this class as the basis of a synchronizer, redefine the
* following methods
*
* <li> {@link #tryAcquire}
* {@link #tryRelease}
* {@link #tryAcquireShared}
* {@link #tryReleaseShared}
* {@link #isHeldExclusively}
*
在源码中找到这几个类
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
一看清一色都是抛出异常我就放心了,这正是留给我们子类实现的模板方法呀,接下来就是我写个类实现他们就好咯,可是怎么写...
正想去百度,突然发现注释中居然给出了一段 基于 AQS 的实现小 demo,还挺长,我理解了它的意思,并且把我看不懂的都去掉了,写出了很简洁的锁
public class FlashLock {
// 获取锁(这回填好骨肉了)
public void lock() {
sync.acquire(1);
}
// 释放锁
public void unlock() {
sync.release(1);
}
private final Sync sync = new Sync();
// 这个内部类就是继承并实现了 AQS 但我这里只先实现两个方法
private static class Sync extends AbstractQueuedSynchronizer {
@Override
public boolean tryAcquire(int acquires) {
// CAS 方式尝试获取锁,成功返回true,失败返回false
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
// 释放锁,这里为什么不像上面那样也是 CAS 操作呢?请读者思考
setState(0);
return true;
}
}
}
lock 和 unlock 方法都实现了,我赶紧写个经典的测试代码
// 可能发生线程安全问题的共享变量
private static long count = 0;
// 两个线程并发对 count++
public static void main(String[] args) throws Exception {
// 创建两个线程,执行add()操作
Thread th1 = new Thread(()-> add());
Thread th2 = new Thread(()-> add());
// 启动两个线程
th1.start();
th2.start();
// 等待两个线程执行结束
th1.join();
th2.join();
// 这里应该是 20000 就对了,说明锁生效了
System.out.println(count);
}
// 我画了一上午写出来的锁,哈哈
private static ExampleLock exampleLock = new ExampleLock();
// 循环 count++,进行 10000 次
private static void add() {
exampleLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
add2();
// 没啥异常,我就直接释放锁了
exampleLock.unlock();
}
测了好几次,发现都是 20000,哈哈,大功告成,我赶紧在大群里 @所有人,告诉大家我写的新工具。同事和老板纷纷给我点了赞。
我又忍不住笑出了声音。走出了公司,准备找个地方吃午饭。
不得不研究下 AQS 的原理
下午两点整,我又成为公司最后一个午睡起床的人...
小宇:闪客,你的工具类确实好用,而且源码也很简洁
我:哈哈,大家喜欢用就好
小宇:不过我有个问题,就是我用你的这个锁工具,有的线程总是抢不到锁,有的线程总是能抢到锁。虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况呢?
我:这怎么可能,我就是写个锁工具,还能影响到人家 CPU 和操作系统层面的机制?
小宇:你想想吧,作为公司最帅的程序猿,我相信你哦
我:额这...
我这人最禁不住妹子夸奖,赶紧开启电脑屏幕,盯着我的获取锁的代码看
@Override
public boolean tryAcquire(int acquires) {
// 一上来就 CAS 抢锁
if (compareAndSetState(0, acquires)) {
return true;
}
return false;
}
我发现这段代码中在尝试获取锁时,一上来就 CAS 抢锁,一旦成功就返回了 true。那我这里是否能加入某些机制,使这些线程不要一有机会就开始直接开始抢锁,而是先考虑一下其他线程的感受再决定是否抢锁呢?
我发现此时不得不研究一下 AQS 的内部实现逻辑了,也就是原理,看看能不能得到一些思路。
我看 AQS 虽然方法一大堆,但属性一共就四个(有一个是内部类 Node)
public abstract class AbstractQueuedSynchronizer {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class Node {}
}
static final class Node {
// ... 省略一些暂不关注的
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
结合最开始看那段对 AQS 高度概括的注释
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.
不难猜到这里的内部类 Node 以及其类型的变量 head 和 tail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。
等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道,Node 存放了当前线程的指针 thread,也即可以表示当前线程并对其进行某些操作,prev 和 next 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。
太好了,仅仅看一些属性和一段注释,就得到了一个关于 AQS 大致原理的猜测,看起来还挺靠谱,我赶紧把它画成几张图来加深理解。(由于这里非常重要,就不再卖关子了,直接画出最正确的图理解,但不会过于深入细节)
以下的图是 AQS 最为核心的几行代码的直观理解过程,请大家仔细品味
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先初始状态 AQS 的 state=0,表示没有线程持有锁。head 和 tail 也都为空,表示此时等待队列里也没有线程。
这时第一个线程(线程1)来了,没有任何线程和它抢,直接拿到了锁(state=1)
然后线程2也来了,假设此时线程1没有释放锁,那么线程2抢锁失败(执行你自己写的 tryAcquire)。失败后,剩下的事都是 AQS 帮你做的,首先加入等待队列的队尾 addWaiter(此时队列为空,所以要先初始化一个占位的头结点),然后在循环里尝试获取锁 acquireQueue(注意这里面有相当多的细节,首先前驱结点是头结点的才能尝试获取锁,即排在队头的才有机会。再有循环里获取锁并不是一直 CAS,而是通过一个标志控制了次数,使得 CAS 两次都失败后就将线程挂起 park,之后等待持有锁的线程释放锁之后再唤醒 unpark。其余各种细节,希望读者阅读源码,不是一句两句说清楚的)。
然后线程3也来了,经历了和线程2一样的经历,只不过它的前驱结点不是头结点,因此还不能有机会尝试获取锁,只有等线程2抢到了锁并且出队,自己的前驱结点变成了头结点,才可以。
这时线程1终于释放了锁(state=0),与此同时找到队列的头结点进行唤醒 unpark。此时头结点是线程2表示的 Node,因此对线程2进行了唤醒操作。如果此时线程2没有被挂起,说明还在尝试获取锁的过程中,那么就尝试好了。如果已经被挂起了,那么唤醒线程2,使得线程2继续不断尝试 CAS 获取锁,直到成功为止。
如此,循环往复... 你大概懂了么?
嗯原理搞懂了,实现一个公平锁
仔细看上面的倒数第二张图。
好好好,你懒得往上翻,我给你粘过来。
原本在队列中等待的线程 2,被线程 1 释放锁之后唤醒了,但它仍然需要抢锁,而且有可能抢失败。
那如果每次这个线程 2 尝试抢锁时,都有其他新来的线程把锁抢去,那线程 2 就一直得不到运行机会,而且排在线程 2 后面的等待线程,也都没有机会运行。
导致有的线程一直得不到运行机会的,就是这个新进来的线程每次都不管有没有人排队,都直接上来就抢锁导致的。
妥了,刚刚小宇提出的问题,我终于有了思路,就是让新来的线程抢锁时,先问一句,“有没有人排队呀?如果有人排队那我先排到队尾好了”。
@Override
public boolean tryAcquire(int acquires) {
// 原有基础上加上这个
if (有线程在等待队列中) {
// 返回获取锁失败,AQS会帮我把该线程放在等待队列队尾的
return false;
}
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
怎么判断是否有线程在等待队列呢?机智的我觉得,AQS 这么优秀的框架一定为上层提供了一个方法,不会让我们深入到它实现的内部的,果然我找到了。
public final boolean hasQueuedPredecessors()
再经过优化结构后,最终的代码变成了这样
@Override
public boolean tryAcquire(int acquires) {
if (hasQueuedPredecessors() &&
compareAndSetState(0, 1)) {
return true;
}
return false;
}
哈哈,大功告成,赶紧去找小宇显摆一下。
等等...
那我原来的那种实现方式就没了,肯定有其他人找我质问,emmm,我两种方式都暴露给大家吧,随大家选。
我将原来的暴力抢锁方式起了个名,叫非公平锁,因为线程抢锁不排队,纯看脸。按小宇需求实现的排队获取锁,我叫它公平锁,因为只要有线程在排队,新来的就得乖乖去排队,不能直接抢。
// 想要公平锁,就传 true 进来
public FlashLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
哈哈,有了高大上的名字和代码实现,我兴高采烈去找小宇交差了。
老板要求方法可以重入
晚上五点半,我正准备成为全组第一个去吃饭的人,突然老板阴着脸跑了过来。
老板:闪客,我用你这工具导致一个线程卡死了呀,一直获取不到锁
我:嗯怎么会呢?
老板:代码发你了,你赶紧看看!
我打开了锁了屏的电脑,点开了老板发来的代码
public void doSomeThing2() {
flashLock.lock();
doSomeThing2();
flashLock.unlock();
}
public void doSomeThing2() {
flashLock.lock();
...
flashLock.unlock();
}
我恍然大悟,原来一个线程执行了一个方法,获取了锁,这个方法没有结束,又调用了另一个需要锁的方法,于是卡在这再也不走了。
这个原理很容易理解,但这似乎用起来确实不太友好,怪不得老板那么生气。有没有办法,让同一个线程持有锁时,还能继续获取锁(可重入),只有当不同线程才互斥呢?
我苦思冥想,感觉不对呀,现在 AQS 里面的所有变量我都用到了,没见哪个变量可以记录当前线程呀。
哦对!AQS 本身还继承了 AbstractOwnableSynchronizer 这个类!我很快在这个类里面发现了这个属性!
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
熟悉了之前的套路,我很快又找到了这两个方法!
protected final void setExclusiveOwnerThread(Thread thread);
protected final Thread getExclusiveOwnerThread();
大功告成,此时我只要在一个线程发现锁已经被占用时,不直接放弃,而是再看一下占用锁的线程是不是正是我自己,就好了。有了前面的经验,这次我直接写出了最终的可重入的公平锁代码。
@Override
public boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (hasQueuedPredecessors() && compareAndSetState(0, 1)) {
// 拿到锁记得记录下持锁线程是自己
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 看见锁被占了(state!=0)也别放弃,看看是不是自己占的
setState(c + acquires);
return true;
}
return false;
}
6. 下班!
我把这个最终版的锁代码提交,霸气地收拾东西下班了,今天真是收获满满。
好啦我的故事讲完了,如果你坚持读到了这里并且完全理解了上面的所有事情,那么恭喜你,你已经掌握了 AQS 的核心原理以及基于它的一个经典的锁实现 ReentrantLock 的几乎全部知识点,AQS 的体系骨架算是被你不知不觉建立起来了,这两个都是 Java 程序员面试必备的东西。
虽然这只是皮毛,但如果你是第一次接触这两个概念,那本篇文章的最大意义在于对他们有了一个三观很正的第一印象。我希望 AQS 的给你的第一印象不是什么抽象的队列式同步器,而只是一个为了更方便实现各种锁而提供的一个包含几个模板方法的类而已,虽然并不准确,而且显得很 low,但实则可能恰恰是说到了本质。
7. 继续深入 AQS
我之后也会出关于 AQS 继续深入的文章,不过下面的三篇系列文章你可以花上两三个小时在电脑上看一下,真的非常非常非常给力。
一行一行源码分析清楚 AQS(一): 一行一行源码分析清楚 AQS(二) 一行一行源码分析清楚 AQS(三)
另外,我也推荐你,用跟踪源码或 debug 的方式,从头到尾自己跟一遍下面三行代码,是几乎 AQS 的全部核心逻辑,这个看懂了,其他的都是浮云。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}