JUC并发编程之ReentrantLock非公平锁源码详解

下图就是基于AQS所实现的一些比较强大的功能类,例如公平与非公平锁,信号量,线程池


// 标识拿到锁的是哪个线程private transient Thread exclusiveOwnerThread;
// 标识头节点private transient volatile Node head;// 标识尾节点private transient volatile Node tail;// 同步状态,为0时,说明可以抢锁private volatile int state;
// 记录Node状态volatile int waitStatus;// 标识Node是哪个线程所持有volatile Node prev;// 前驱节点(当前Node的上一个是谁)volatile Node next;// 后继节点(当前Node的个一个是谁)volatile Thread thread;
下面是一个简单的lock案例,使用lock需要注意点,我们获取锁需要被try包裹,释放锁一定要在finally里面,为什么需要这么做呢?是因为如果多个线程同时进来抢锁,当其中一个线程抢到锁后,执行完业务逻辑还没来得及释放锁就发生了异常,就会导致锁没有被释放,从而该业务逻辑一直被卡住。所以为了避免这一现象的发生,我们的释放锁一定要写在finally里面啦~
public class ReentrantLockTest {// 初始化Lock类static final Lock lock = new ReentrantLock();static int sum = 0;public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {Thread thread = new Thread(new Runnable() {@Overridepublic void run() {try {// 获取锁lock.lock();for (int i1 = 0; i1 < 10000; i1++) {sum++;}} catch (Exception e) {e.printStackTrace();} finally {//释放锁lock.unlock();}}});thread.start();}TimeUnit.SECONDS.sleep(1);System.out.println(sum);}}
在ReentLock里面,其中有两个构造方法值得我们注意


public void lock() {sync.lock();}

首先尝试以最快的方式获取锁,当多个线程同时进来了,只会有其中一个线程以CAS的方式将state的值更新为1,前提条件是只有当state的值为0的时候才能更新成功,同时会记录获取锁的线程,若获取锁失败,则执行acquire方法
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {//以cas的方式将AQS中的state属性值从0更新为1if (compareAndSetState(0, 1))//如果更新成功,则将当前线程设置为持有锁的线程setExclusiveOwnerThread(Thread.currentThread());else//获取锁失败,则进入该方法acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
在ReentrantLock中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
找到关键点 -> 非公平类加锁入口

如下则是非公平锁的实现方式,在其底层调用了nonfairTryAcquire()方法
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
final boolean nonfairTryAcquire(int acquires) {//记录当前线程final Thread current = Thread.currentThread();//获取当前state的值int c = getState();//如果state的值为0,则证明没有线程抢占锁if (c == 0) {// 通过cas进行加锁操作if (compareAndSetState(0, acquires)) {// 记录加锁线程setExclusiveOwnerThread(current);return true;}}// 如果锁已经被抢占,且当前线程就是持有锁的线程,则说明该锁被重入else if (current == getExclusiveOwnerThread()) {//计算要更新后的state值int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");//给state设置值,该方式为非同步setState(nextc);//重入锁获取成功return true;}//获取锁失败return false;}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
private Node addWaiter(Node mode) {//创建一个节点,将线程实例封装到Node节点内部,当前mode这里是为null的Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure//获取末尾节点Node pred = tail;//如果末尾节点不为空,则证明前面已经有线程排队啦if (pred != null) {//将末尾节点的引用交给当前线程的前节点node.prev = pred;// 通过cas,将当前线程设置为链表的尾节点if (compareAndSetTail(pred, node)) {// 将前尾节点的next引用,指向当前节点,那么当前节点就是链表尾节点了pred.next = node;return node;}}//构建链表核心方法enq(node);return node;}
private Node enq(final Node node) {//自旋for (;;) {//获取当前双向链表的末尾节点Node t = tail;//如果当前还没有双向链表,则进行构建链表if (t == null) { // Must initialize//初始化一个head节点,空的node节点if (compareAndSetHead(new Node()))//将head节点赋值给tail节点,这样的头尾节点都已生成好了tail = head;} else {//自旋第二次会进来//头节点设置为 空节点node.prev = t;//通过cas,将当前节点设置为末尾节点if (compareAndSetTail(t, node)) {//设置尾节点引用t.next = node;return t;}}}}

那么到此获取锁失败的线程,通过构建链表,然后假如到同步队列的逻辑到此就结束了,但是线程加入同步队列后会做什么我们并不清楚,这部分我们并不清楚,所以我们接着退回到acquire方法来,我们进入到acquireQueued方法内部,看看它里面做了些啥逻辑
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
第一个if判断中,首先会判断当前线程的前驱节点是否为头节点,如果是则尝试获取锁,获取锁成功则将当前线程节点设置为头节点,为什么必须是前驱节点才尝试去获取锁,因为正常情况下,头节点才是持有锁的线程,头节点线程释放掉锁后,会唤醒后驱节点的线程,这个时候被唤醒后才会进行获取锁。举个很形象的例子,就比如火车站窗口排队买火车票,此时有人正在购买火车票,而排在第二个人可以知道第一个人是否购买完成,如果买完了我就可以去买票了啊,如果没买完,那么第二个人后面的人就只能老老实实的进行排队购票啊。
那么第二个if判断就是如果前面有人在排队购票,那么我就只能老老实实排在后面进行等待呗。再举个很形象的例子哈,有一款很好玩的游戏,因为这款游戏还没发售需要进行预约,此时我并不知道它什么时候才会发售,所以就先预约排队嘛,当要发售了,就会有工作人员去通知你,你预约的游戏可以已经发售了,你可以进行购买了,这就是排队与通知的。
//Node: 为线程1的节点 arg:1final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//只有获取到锁的线程才会跳出循环for (;;) {//获取当前线程节点的 prev 节点final Node p = node.predecessor();// 第二个线程 第一个条件为true , 第二个条件为false,因为第二个条件是 抢锁逻辑if (p == head && tryAcquire(arg)) {setHead(node); //将当前节点更新为头节点p.next = null; // help GCfailed = false;return interrupted; //正常情况下死循环的唯一出口}// 将上一个节点设置为独占锁-1,条件返回fasle,第二次自旋进来if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞//阻塞当前线程parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
我们可以前驱节点pred的状态会进行不同处理
其实这个方法主要作用就是确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟只有确保能够被唤醒,当前线程才能放心的阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) //状态为SIGNAL/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) { //状态为CANCELLED/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { //状态为初始化状态(ReentrentLock语境下)/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
public void unlock() {sync.release(1);}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
protected final boolean tryRelease(int releases) {int c = getState() - releases; //计算待更新的state值if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) { //待更新的state值为0,说明持有锁的线程未重入,一旦释放锁其他线程将能获取free = true;setExclusiveOwnerThread(null); //清除锁的持有线程标记}setState(c); //更新state值return free;}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
private void unparkSuccessor(Node node) {// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现int ws = node.waitStatus;// 由于-1会小于0,所以更新改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取第一个正常排队的节点Node s = node.next;//正常解锁流程不会走该if判断if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒if (s != null)LockSupport.unpark(s.thread);}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


