如何手写一个 AQS?

Java技术迷

共 14259字,需浏览 29分钟

 · 2021-09-06


AQS 即 AbstractQueuedSynchronizer,是用来实现锁和线程同步的一个工具类。大部分操作基于 CAS 和 FIFO 队列来实现。


如果让我们自己 ,实现可以分为几个大部分:


  1. 加锁
  2. 解锁
  3. 入队
  4. 出队
  5. 阻塞
  6. 唤醒


我们来想一下这几个部分的实现。


1. 加锁


首先,用一个变量 state 作为锁的标志位。默认值 0,表示此时所有线程都可以加锁。加锁的时候通过 CAS 将 state 从 0 变为 1,CAS 执行成功表示加锁成功。


当有线程占有了锁,这时候有其他线程来加锁,按照下面判断当前来抢锁的线程是不是占用锁的线程:


  • :重入锁,state 加 1。当释放的时候 state 减 1。用 state 表示加锁的次数;

  • :加锁失败,将线程放入等待队列,并且阻塞。


有没有什么其他可以优化的地方?比如当放入等待队列的时候,看看有没有其他线程?


有。锁被占用了,并且轮不到当前线程来抢,直接阻塞就行了。在放入队列时候,通过CAS 再尝试获取一波锁。如果获取成功,就不用阻塞了,提高了效率。


2.  解锁


通过 CAS 对 state 减 1。如果是重入锁,释放一次减一次。当 state 等于 0 时表示锁被释放,唤醒等待队列中的线程。


3. 入队


入队这个过程和我们平常使用的队列不同。平常使用的队列每次生成一个节点放入即可。而 AQS 队列为空时,第一次生成两个节点。第一个节点代表当前占有锁的线程,第二个节点为抢锁失败的节点。不为空的时候,每次生成一个节点放入队尾。


当把线程放入队列中时,后续应该做哪些操作呢?


如果让你写是不是直接放入队列中就完事了?但 Doug Lea 是这样做的:


  • 如果当前线程是队列中的第二个节点,则再尝试抢一下锁(不是第二个节点就不用抢来,轮不到)。这样避免了频繁的阻塞和唤醒线程,提高了效率;

  • 上闹钟,让上一个线程来唤醒自己(后续会说到,即更改上一个节点的 waitStatus);

  • 阻塞。


4. 出队


当 A 线程释放锁,唤醒队列中的  B线程,A 线程会从队列中删除。那出队这个事情由谁来做?是由被唤醒的线程来做,即B 线程。


5. 阻塞和唤醒


阻塞和唤醒线程调用 API 即可:


// 阻塞线程LockSupport.park(this)// 唤醒线程LockSupport.unpark(this)

5.1 独占锁的获取和释放


JUC 中的许多并发工具类的实现都依赖 AbstractQueuedSynchronizer,例如  ReentrantLock、CountDownLatch 等。



AbstractQueuedSynchronizer 定义了一个锁实现的内部流程,而如何加锁和解锁则在各个子类中实现。这是典型的模板方法模式。


AQS 内部维护了一个 FIFO 的队列(底层实现就是双向链表),通过该队列来实现线程的并发访问控制。


队列中的元素是一个 Node 节点:


static final class Node {   //表示当前线程以共享模式持有锁   static final Node SHARED = new Node();   //表示当前线程以独占模式持有锁   static final Node EXCLUSIVE = null;     static final int CANCELLED =  1;   static final int SIGNAL    = -1;   static final int CONDITION = -2;   static final int PROPAGATE = -3;     //当前节点的状态   volatile int waitStatus;   //前继节点   volatile Node prev;   //后继节点   volatile Node next;   //当前线程   volatile Thread thread;   //存储在condition队列中的后继节点   Node nextWaiter;}

waitStatus 表示节点的状态,默认为 0。包含的状态有


再来看 AbstractQueuedSynchronizer 这个类的属性。


//等待队列的头节点private transient volatile Node head;//等待队列的尾节点private transient volatile Node tail;//加锁的状态,在不同子类中有不同的意义private volatile int state;

这个 state 在不同的子类中有不同的含义:


  • ReentrantLock 中的 state:表示加锁的次数。0 表示没有被加锁,1 表示被加锁一次,为 2 表示被加锁两次。ReentrantLock 是一个可以重入的锁;

  • CountDownLatch 中的 state:表示一个计数器。当 state>0 时,线程调用 await 会被阻塞,当 state 值被减少为 0 时,线程会被唤醒;

  • Semaphore 中的 state表示资源的数量。state>0 时,可以获取资源,并将 state-1。当 state=0 时,获取不到资源,此时线程会被阻塞。当资源被释放时,state+ 1,此时其他线程可以获得资源。


AbstractQueuedSynchronizer 中的 FIFO 队列用双向链表来实现。

AQS 提供了独占锁共享锁两种加锁方式,每种方式都有响应中断不响应中断的区别。


因此 AQS 的锁可以分为如下四类:


  • 不响应中断的独占锁(acquire)
  • 响应中断的独占锁(acquireInterruptibly)
  • 不响应中断的共享锁(acquireShared)
  • 响应中断的共享锁(acquireSharedInterruptibly)


而释放锁的方式只有两种:


  • 独占锁的释放(release)
  • 共享锁的释放(releaseShared)


6. 不响应中断的独占锁


以 ReentrantLock 为例,从加锁这一部分开始分析:


// 调用ReentrantLock.FairSync#lock方法其实就是调用acquire(1);public final void acquire(int arg) {   if (!tryAcquire(arg) &&       //获取到锁返回false,否则返回true      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      //当前线程将自己中断      selfInterrupt();}

  1. 先尝试获取,如果获取到直接退出,否则进入第 2 步;

  2. 获取锁失败,以独占模式将线程包装成 Node 放到队列中;

  3. 如果放入的节点是队列的第二个节点,则再尝试获取锁。因为此时锁有可能释放类,不是第二个节点就不用尝试了,因为轮不到。如果获取到锁则将当前节点设为 head 节点,退出,否则进入第 4 步;

  4. 设置好闹钟后将自己阻塞;

  5. 线程被唤醒,重新竞争锁,获取锁成功,继续执行。如果线程发生过中断,则最后重置中断标志位位 true,即执行 selfInterrupt() 方法。


从代码层面详细分析一波


tryAcquire 是留给子类实现的:


protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

这里通过抛出异常来告诉子类要重写这个方法。


为什么不将这个方法定义为 abstract 方法呢?


因为 AQS 有两种功能,独占和共享。如果用 abstract 修饰,则子类需要同时实现两种功能的方法,对子类不友好。


  • 当队列不为空,尝试将新节点通过CAS的方式设置为尾节点。如果成功,返回附加着当前线程的节点;
  • 当队列为空或者新节点通过 CAS 的方式设置为尾节点失败,进入 enq 方法。
private Node addWaiter(Node mode) {   Node node = new Node(Thread.currentThread(), mode);   Node pred = tail;   if (pred != null) {      node.prev = pred;      if (compareAndSetTail(pred, node)) {         pred.next = node;         return node;      }   }   enq(node);   return node;}


  • 当队列不为空,一直CAS,直到把新节点放入队尾;
  • 当队列为空,先往对列中放入一个节点,在把传入的节点 CAS 为尾节点。


前面说过,AQS 队列为空时,第一次会放入两个节点。


private Node enq(final Node node) {   for (;;) {      Node t = tail;      // 队列为空,进行初始化,      if (t == null) {         if (compareAndSetHead(new Node()))            tail = head;      } else {         node.prev = t;         if (compareAndSetTail(t, node)) {            t.next = node;            return t;         }      }   }}

放入队列后还要干什么?


  • 如果是第二个节点再尝试获取一波锁,因为此时有可能锁已经释放了,其他节点就不用了,因为还轮不到;
  • 上闹钟,让别的线程唤醒自己;
  • 阻塞自己。

// 自旋获取锁,直到获取锁成功,或者异常退出// 但是并不是busy acquire,因为当获取失败后会被挂起,由前驱节点释放锁时将其唤醒// 同时由于唤醒的时候可能有其他线程竞争,所以还需要进行尝试获取锁,体现的非公平锁的精髓。final boolean acquireQueued(final Node node, int arg) {   boolean failed = true;   try {      boolean interrupted = false;      for (;;) {         // 获取前继节点         final Node p = node.predecessor();         // node节点的前继节点是head节点,尝试获取锁,如果成功说明head节点已经释放锁了         // 将node设为head开始运行(head中不包含thread)         if (p == head && tryAcquire(arg)) {            setHead(node);            // 将第一个节点出队            p.next = null; // help GC            failed = false;            return interrupted;         }         // 获取锁失败后是否可以挂起         // 如果可以挂起,则阻塞当前线程(获取锁失败的节点)         if (shouldParkAfterFailedAcquire(p, node) &&            parkAndCheckInterrupt()) {                interrupted = true;            }     }   } finally {      if (failed)       cancelAcquire(node);   }}

根据前继节点的状态,判断是否可以阻塞当前获取锁失败的节点。


一般情况会经历如下两个过程:


  1. 默认情况下,上一个节点的 waitStatus 等于 0,所以会进入 compareAndSetWaitStatus 方法。通过 CAS 将上一个节点的 waitStatus 设置为 SIGNAL,然后 return false;
  2. shouldParkAfterFailedAcquire 方法外面是一个死循环。当再次进入这个方法时,如果上一步 CAS 成功,则会走第一个 if,return true。接着执行 parkAndCheckInterrupt,线程会阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {   int ws = pred.waitStatus;   // 前继节点释放时会unpark后继节点,可以挂起   if (ws == Node.SIGNAL)      return true;   if (ws > 0) {      //将CANCELLED状态的线程清理出队列      // 后面会提到为什么会有CANCELLED的节点      do {         node.prev = pred = pred.prev;      } while (pred.waitStatus > 0);      pred.next = node;   } else {      // 将前继节点的状态设置为SIGNAL,代表释放锁时需要唤醒后面的线程      // cas更新可能失败,所以不能直接返回true      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);   }   return false;}

shouldParkAfterFailedAcquire 表示上好闹钟了,可以阻塞线程了。后续当线程被唤醒的时候会从 return 语句出继续执行,然后进入 acquireQueued 方法的死循环,重新抢锁。至此,加锁结束。


// 挂起线程,返回是否被中断过private final boolean parkAndCheckInterrupt() {   // 阻塞线程   LockSupport.park(this);   // 返回当前线程是否被调用过Thread#interrupt方法   return Thread.interrupted();}

最后用一个流程图来解释不响应中断的独占锁:


入队过程中发生异常该怎么办?


可以看到上面调用 acquireQueued 方法发生异常的时候,会调用 cancelAcquire 方法。我们就详细分析一下这个 cancelAcquire 方法有哪些作用


哪些地方执行发生异常会执行 cancelAcquire?


以看到调用 cancelAcquire 方法的有如下几个部分:



分析这些方法的调用,发现基本就是如下两个地方会发生异常:


  1. 尝试获取锁的方法如 tryAcquire,这些一般是交给子类来实现的;
  2. 当线程是被调用 Thread#interrupt 方法唤醒,如果要响应中断,会抛出 InterruptedException。


//处理异常退出的nodeprivate void cancelAcquire(Node node) {   if (node == null)      return;     // 设置该节点不再关联任何线程   node.thread = null;     // 跳过CANCELLED节点,找到一个有效的前继节点   Node pred = node.prev;   while (pred.waitStatus > 0) {        node.prev = pred = pred.prev;   }      // 获取过滤后的有效节点的后继节点   Node predNext = pred.next;     // 设置状态为取消   node.waitStatus = Node.CANCELLED;     // case 1   if (node == tail && compareAndSetTail(node, pred)) {      compareAndSetNext(pred, predNext, null);   } else {      // case 2      int ws;      if (pred != head &&          ((ws = pred.waitStatus) == Node.SIGNAL ||          (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&       pred.thread != null) {       Node next = node.next;       if (next != null && next.waitStatus <= 0) {          compareAndSetNext(pred, predNext, next);       }      } else {         // case3         unparkSuccessor(node);      }        node.next = node; // help GC   }}

将 node 出队有如下三种情况:


  • 当前节点是 tail;
  • 当前节点不是 head 的后继节点,也不是 tail;
  • 当前节点是 head 的后继节点。


当前节点是 tail


compareAndSetTail 将 tail 指向 pred compareAndSetNext,将 pred 的 next 指向 null,也就是把当前节点移出队列。


当前节点不是 head 的后继节点,也不是 tail



这里将 node 的前继节点的 next 指向了 node 的后继节点,即 compareAndSetNext(pred, predNext, next)。


注意:pred 和n ode 节点中间有可能有 CANCELLED 的节点,怕乱就没画出来。


当前节点是 head 的后继节点


没有对队列进行操作,只是进行 head 后继节点的唤醒操作(unparkSuccessor 方法,后面会分析这个方法)。因为此时是 head 的后继节点,还是有可能获取到锁的,所以唤醒它尝试获取一波锁


当再次调用到 shouldParkAfterFailedAcquire(判断是否应该阻塞的方法时)会把 CANCELLED 状态的节点从队列中删除。


7. 独占锁的释放


独占锁是释放其实就是利用 CAS 将 state-1。当 state=0 表示锁被释放,需要将阻塞队列中的线程唤醒。


// 调用ReentrantLock#unlock方法其实就是调用release(1)public final boolean release(int arg) {   // 尝试释放锁   // 当state=0,表示锁被释放,tryRelease返回true,此时需要唤醒阻塞队列中的线程   if (tryRelease(arg)) {      Node h = head;      if (h != null && h.waitStatus != 0) {         unparkSuccessor(h);      }      return true;   }   return false;}

tryRelease 即具体的解锁逻辑,需要子类自己去实现


唤醒同步队列中的线程,可以看到前面加了判断 h != null && h.waitStatus != 0。


  • h = null 说明同步同步队列中没有数据,则不需要唤醒;

  • h = null && waitStatus = 0 同步队列是有了,但是没有线程给自己上闹钟,不用唤醒;

  • h != null && waitStatus < 0,说明头节点被人上了闹钟,自己需要唤醒阻塞的线程;

  • h != null && waitStatus > 0,头节点因为发生异常被设置为取消,但还是得唤醒线程。


                private void unparkSuccessor(Node node) {   int ws = node.waitStatus;   if (ws < 0) {      compareAndSetWaitStatus(node, ws, 0);   }     // 头结点的下一个节点   Node s = node.next;   // 为空或者被取消   if (s == null || s.waitStatus > 0) {      s = null;      // 从队列尾部向前遍历找到最前面的一个waitStatus<=0的节点      for (Node t = tail; t != null && t != node; t = t.prev) {         if (t.waitStatus <= 0) {            s = t;         }      }   }   if (s != null) {      // 唤醒节点,但并不表示它持有锁,要从阻塞的地方开始运行      LockSupport.unpark(s.thread);   }}


为什么要从后向前找第一个非 CANCELLED 的节点呢?


private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            // 线程在这里挂起了            pred.next = node;            return node;        }    }    enq(node);    return node;}

这其实和入队的逻辑有关系。


假如 Node1 在图示位置挂起了,Node1 后面又陆续增加了 Node2 和 Node3。此时从前向后遍历会导致元素丢失,不能正确唤醒线程。


8. 分析一下独占锁响应中断和不响应中断的区别


之前说过独占锁可以响应中断,也可以不响应中断。调用的方法如下:


  • 不响应中断的独占锁(acquire)
  • 响应中断的独占锁(acquireInterruptibly)

所以只需要看这两个个方法的区别在哪里,下面只列出有区别的部分:


public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();} public final void acquireInterruptibly(int arg)         throws InterruptedException {     // 判断线程是否被中断     if (Thread.interrupted())         throw new InterruptedException();     if (!tryAcquire(arg))         doAcquireInterruptibly(arg); }


acquire 在尝试获取锁的时候完全不管线程有没有被中断,而 acquireInterruptibly 在尝试获取锁之前会判断线程是否被中断。如果被中断,则直接抛出异常。

tryAcquire 方法一样,只需要对比 acquireQueued 方法和 doAcquireInterruptibly 方法的区别即可。



执行 acquireQueued 方法当线程发生中断时,只是将 interrupted 设置为 true,并且调用 selfInterrupt 方法将中断标志位设置为 true。



而执行 doAcquireInterruptibly 方法,当线程发生中断时,直接抛出异常。


最后看一下 parkAndCheckInterrupt 方法,这个方法中判断线程是否中断的逻辑特别巧妙!


private final boolean parkAndCheckInterrupt() {   LockSupport.park(this);   return Thread.interrupted();}

Thread 类提供了如下两个个方法来判断线程是否是中断状态:


  • isInterrupted
  • interrupted


这里为什么用 interrupted 而不是 isInterrupted 呢?


演示一下这两个方法的区别:


@Testpublic void testInterrupt() throws InterruptedException {    Thread thread = new Thread(() -> {        while (true) {}    });    thread.start();    TimeUnit.MICROSECONDS.sleep(100);    thread.interrupt();    // true    System.out.println(thread.isInterrupted());    // true    System.out.println(thread.isInterrupted());    // true    System.out.println(thread.isInterrupted());}
@Testpublic void testInterrupt2() { Thread.currentThread().interrupt(); // true System.out.println(Thread.interrupted()); // false System.out.println(Thread.interrupted()); // false System.out.println(Thread.interrupted());}

isInterrupted 和 interrupted 的方法区别如下:


  • Thread#isInterrupted:测试线程是否是中断状态,执行后不更改状态标志;

  • Thread#interrupted:测试线程是否是中断状态,执行后将中断标志更改为 false。


接着再写两个例子:


public static void main(String[] args) {   LockSupport.park();   // end被一直阻塞没有输出   System.out.println("end");}
public static void main(String[] args) { Thread.currentThread().interrupt(); LockSupport.park(); // 输出end System.out.println("end");}

可以看到当线程被中断时,调用 park() 方法并不会被阻塞:


public static void main(String[] args) {   Thread.currentThread().interrupt();   LockSupport.park();   // 返回中断状态,并且清除中断状态   Thread.interrupted();   // 输出start   System.out.println("start");   LockSupport.park();   // end被阻塞,没有输出   System.out.println("end");}

到这我们就能理解为什么要进行中断的复位了:


  • 如果当前线程是非中断状态,则在执行 park 时被阻塞,返回中断状态 false;
  • 如果当前线程是中断状态,则 park 方法不起作用,返回中断状态 true。interrupted 将中断复位,变为 false;
  • 再次执行循环的时候,前一步已经在线程的中断状态进行了复位,则再次调用 park 方法时会阻塞。


所以这里要对中断进行复位,是为了不让循环一直执行,让当前线程进入阻塞状态,如果不进行复位,前一个线程在获取锁之后执行了很耗时的操作,那当前线程岂不是要一直执行死循环,造成CPU使用率飙升?


独占锁的获取和释放我们已经搞清楚了,接下来基于 AQS 自己写一个锁。


9. 基于 AQS 自己写一个锁


AQS 已经把入队、出队、阻塞、唤醒的操作都封装好了。当我们用 AQS 来实现自己的锁时,就非常的方便了,只需要重写加锁和解锁的逻辑即可。这里演示一个基于 AQS 实现的非重入的互斥锁。


public class MyLock {    private final Sync sync;    public MyLock() {        sync = new Sync();    }    public class Sync extends AbstractQueuedSynchronizer {        @Override        protected boolean tryAcquire(int arg) {            return compareAndSetState(0, arg);        }        @Override        protected boolean tryRelease(int arg) {            setState(0);            return true;        }    }    public void lock() {        sync.acquire(1);    }    public void unLock() {        sync.release(1);    }}


1、灵魂一问:你的登录接口真的安全吗?
2、HashMap 中这些设计,绝了~
3、在 IntelliJ IDEA 中这样使用 Git,贼方便了!
4、计算机时间到底是怎么来的?程序员必看的时间知识!
5、这些IDEA的优化设置赶紧安排起来,效率提升杠杠的!
6、21 款 yyds 的 IDEA插件
7、真香!用 IDEA 神器看源码,效率真高!

点分享

点收藏

点点赞

点在看

浏览 41
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报