AQS解析与实战
前言
前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。
AQS原理面试题的核心回答要点
state 状态的维护。
CLH队列
ConditionObject通知
模板方法设计模式
独占与共享模式。
自定义同步器。
AQS全家桶的一些延伸,如:ReentrantLock等。
AQS的类图结构
AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:

为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构。
state 状态的维护
在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:state源码
/*** The synchronization state.*/private volatile int state;/*** Returns the current value of synchronization state.* This operation has memory semantics of a {@code volatile} read.* @return current state value*/protected final int getState() {return state;}/*** Sets the value of synchronization state.* This operation has memory semantics of a {@code volatile} write.* @param newState the new state value*/protected final void setState(int newState) {state = newState;}/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a {@code volatile} read* and write.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that the actual* value was not equal to the expected value.*/protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
state 源码设计几个回答要点:
state用volatile修饰,保证多线程中的可见性。
getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。
compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。
CLH队列
谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图:

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
Node节点
CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:

waitStatus几种状态状态:

我们再看一下CLH队列入列以及出列的代码:
入列
CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:
//构造Nodeprivate Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)Node pred = tail;if (pred != null) {node.prev = pred;//CAS设置尾节点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//多次尝试enq(node);return node;}
由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:
private Node enq(final Node node) {//死循环尝试,知道成功为止for (;;) {Node t = tail;//tail 不存在,设置为首节点if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
出列
首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:
Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
CLH核心几个回答要点
双向链表入列出列
CAS算法设置尾节点+死循环自旋。
ConditionObject
ConditionObject简介
我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持 。
Condition队列与CLH队列的那些事
我们先来看一下图:

ConditionObject队列与CLH队列的爱恨情仇:
调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。
线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。
当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。
区别:
ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。
独占与共享模式。
AQS支持两种同步模式:独占式和共享式。
独占式
同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。
公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。
非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。
acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:
acquire(long arg)方法
public final void acquire(long arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
addWaiter方法
//构造Nodeprivate Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)Node pred = tail;if (pred != null) {node.prev = pred;//CAS设置尾节点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//多次尝试enq(node);return node;}
acquireQueued(final Node node, long arg)方法
final boolean acquireQueued(final Node node, long arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
selfInterrupt()方法
static void selfInterrupt() {Thread.currentThread().interrupt();}
结合源代码,可得acquire(int arg)方法流程图,如下:

共享式
多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。
acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:
public final void acquireShared(long arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:
private void doAcquireShared(long arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
AQS的模板方法设计模式
模板方法模式
模板方法模式:在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
模板方法模式生活中的例子:假设我们要去北京旅游,那么我们可以坐高铁或者飞机,或者火车,那么定义交通方式的抽象类,可以有以下模板:买票->安检->乘坐xx交通工具->到达北京。让子类继承该抽象类,实现对应的模板方法。

AQS定义的一些模板方法如下:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器。
自定义同步器。
基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法。
我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:
public class NonReentrantLock implements Lock,Serializable{//内部类,自定义同步器static class Sync extends AbstractQueuedSynchronizer {//是否锁已经被持有public boolean isHeldExclusively() {return getState() == 1;}//如果state为0 则尝试获取锁public boolean tryAcquire(int arg) {assert arg== 1 ;//CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1if(compareAndSetState(0, 1)){//设置当前独占的线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁,设置state为0public boolean tryRelease(int arg) {assert arg ==1;//如果同步器同步器状态等于0,则抛出监视器非法状态异常if(getState() == 0)throw new IllegalMonitorStateException();//设置独占锁的线程为nullsetExclusiveOwnerThread(null);//设置同步状态为0setState(0);return true;}//返回Condition,每个Condition都包含了一个Condition队列Condition newCondition(){return new ConditionObject();}}//创建一个Sync来做具体的工作private final Sync sync= new Sync ();@Overridepublic void lock() {sync.acquire(1);}public boolean isLocked() {return sync.isHeldExclusively();}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.release(1);}@Overridepublic Condition newCondition() {return sync.newCondition();}}
NonReentrantLockDemoTest:
public class NonReentrantLockDemoTest {private static NonReentrantLock nonReentrantLock = new NonReentrantLock();public static void main(String[] args) {for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {nonReentrantLock.lock();try {System.out.println(Thread.currentThread().getName());Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();} finally {nonReentrantLock.unlock();}});thread.start();}}}
运行结果:

AQS全家桶实战
AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下来可以看一下它们的使用案例。
ReentrantLock
ReentrantLock介绍
ReentrantLock为重入锁,能够对共享资源能够重复加锁,是实现Lock接口的一个类。
ReentrantLock支持公平锁和非公平锁两种方式
ReentrantLock案例
使用ReentrantLock来实现个简单线程安全的list,如下:
public class ReentrantLockList {// 线程不安全的listprivate ArrayList<String> array = new ArrayList<>();//独占锁private volatile ReentrantLock lock = new ReentrantLock();//添加元素public void add(String e){lock.lock();try {array.add(e);}finally {lock.unlock();}}//删除元素public void remove(String e){lock.lock();try {array.remove(e);}finally {lock.unlock();}}//获取元素public String get(int index){lock.lock();try {return array.get(index);}finally {lock.unlock();}}}
Semaphore
Semaphore介绍
Semaphore也叫信号量,可以用来控制资源并发访问的线程数量,通过协调各个线程,以保证合理的使用资源。
Semaphore案例
Java多线程有一到比较经典的面试题:ABC三个线程顺序输出,循环10遍。
public class ABCSemaphore {private static Semaphore A = new Semaphore(1);private static Semaphore B = new Semaphore(1);private static Semaphore C = new Semaphore(1);static class ThreadA extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {A.acquire();System.out.print("A");B.release();}} catch (InterruptedException e) {e.printStackTrace();}}}static class ThreadB extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {B.acquire();System.out.print("B");C.release();}} catch (InterruptedException e) {e.printStackTrace();}}}static class ThreadC extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {C.acquire();System.out.print("C");A.release();}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {// 开始只有A可以获取, BC都不可以获取, 保证了A最先执行B.acquire();C.acquire();new ThreadA().start();new ThreadB().start();new ThreadC().start();}

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
