JUC并发编程之ReentrantLock非公平锁源码详解
共 17827字,需浏览 36分钟
·
2021-07-03 16:43
下图就是基于AQS所实现的一些比较强大的功能类,例如公平与非公平锁,信号量,线程池
// 标识拿到锁的是哪个线程
private transient Thread exclusiveOwnerThread;
// 标识头节点
private transient volatile Node head;
// 标识尾节点
private transient volatile Node tail;
// 同步状态,为0时,说明可以抢锁
private volatile int state;
// 记录Node状态
volatile int waitStatus;
// 标识Node是哪个线程所持有
volatile Node prev;
// 前驱节点(当前Node的上一个是谁)
volatile Node next;
// 后继节点(当前Node的个一个是谁)
volatile Thread thread;
下面是一个简单的lock案例,使用lock需要注意点,我们获取锁需要被try包裹,释放锁一定要在finally里面,为什么需要这么做呢?是因为如果多个线程同时进来抢锁,当其中一个线程抢到锁后,执行完业务逻辑还没来得及释放锁就发生了异常,就会导致锁没有被释放,从而该业务逻辑一直被卡住。所以为了避免这一现象的发生,我们的释放锁一定要写在finally里面啦~
public class ReentrantLockTest {
// 初始化Lock类
static final Lock lock = new ReentrantLock();
static int sum = 0;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock.lock();
for (int i1 = 0; i1 < 10000; i1++) {
sum++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
}
});
thread.start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(sum);
}
}
在ReentLock里面,其中有两个构造方法值得我们注意
public void lock() {
sync.lock();
}
首先尝试以最快的方式获取锁,当多个线程同时进来了,只会有其中一个线程以CAS的方式将state的值更新为1,前提条件是只有当state的值为0的时候才能更新成功,同时会记录获取锁的线程,若获取锁失败,则执行acquire方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//以cas的方式将AQS中的state属性值从0更新为1
if (compareAndSetState(0, 1))
//如果更新成功,则将当前线程设置为持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//获取锁失败,则进入该方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在ReentrantLock中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
找到关键点 -> 非公平类加锁入口
如下则是非公平锁的实现方式,在其底层调用了nonfairTryAcquire()方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//记录当前线程
final Thread current = Thread.currentThread();
//获取当前state的值
int c = getState();
//如果state的值为0,则证明没有线程抢占锁
if (c == 0) {
// 通过cas进行加锁操作
if (compareAndSetState(0, acquires)) {
// 记录加锁线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁已经被抢占,且当前线程就是持有锁的线程,则说明该锁被重入
else if (current == getExclusiveOwnerThread()) {
//计算要更新后的state值
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//给state设置值,该方式为非同步
setState(nextc);
//重入锁获取成功
return true;
}
//获取锁失败
return false;
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
//创建一个节点,将线程实例封装到Node节点内部,当前mode这里是为null的
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//获取末尾节点
Node pred = tail;
//如果末尾节点不为空,则证明前面已经有线程排队啦
if (pred != null) {
//将末尾节点的引用交给当前线程的前节点
node.prev = pred;
// 通过cas,将当前线程设置为链表的尾节点
if (compareAndSetTail(pred, node)) {
// 将前尾节点的next引用,指向当前节点,那么当前节点就是链表尾节点了
pred.next = node;
return node;
}
}
//构建链表核心方法
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
//获取当前双向链表的末尾节点
Node t = tail;
//如果当前还没有双向链表,则进行构建链表
if (t == null) { // Must initialize
//初始化一个head节点,空的node节点
if (compareAndSetHead(new Node()))
//将head节点赋值给tail节点,这样的头尾节点都已生成好了
tail = head;
} else {
//自旋第二次会进来
//头节点设置为 空节点
node.prev = t;
//通过cas,将当前节点设置为末尾节点
if (compareAndSetTail(t, node)) {
//设置尾节点引用
t.next = node;
return t;
}
}
}
}
那么到此获取锁失败的线程,通过构建链表,然后假如到同步队列的逻辑到此就结束了,但是线程加入同步队列后会做什么我们并不清楚,这部分我们并不清楚,所以我们接着退回到acquire方法来,我们进入到acquireQueued方法内部,看看它里面做了些啥逻辑
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一个if判断中,首先会判断当前线程的前驱节点是否为头节点,如果是则尝试获取锁,获取锁成功则将当前线程节点设置为头节点,为什么必须是前驱节点才尝试去获取锁,因为正常情况下,头节点才是持有锁的线程,头节点线程释放掉锁后,会唤醒后驱节点的线程,这个时候被唤醒后才会进行获取锁。举个很形象的例子,就比如火车站窗口排队买火车票,此时有人正在购买火车票,而排在第二个人可以知道第一个人是否购买完成,如果买完了我就可以去买票了啊,如果没买完,那么第二个人后面的人就只能老老实实的进行排队购票啊。
那么第二个if判断就是如果前面有人在排队购票,那么我就只能老老实实排在后面进行等待呗。再举个很形象的例子哈,有一款很好玩的游戏,因为这款游戏还没发售需要进行预约,此时我并不知道它什么时候才会发售,所以就先预约排队嘛,当要发售了,就会有工作人员去通知你,你预约的游戏可以已经发售了,你可以进行购买了,这就是排队与通知的。
//Node: 为线程1的节点 arg:1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//只有获取到锁的线程才会跳出循环
for (;;) {
//获取当前线程节点的 prev 节点
final Node p = node.predecessor();
// 第二个线程 第一个条件为true , 第二个条件为false,因为第二个条件是 抢锁逻辑
if (p == head && tryAcquire(arg)) {
setHead(node); //将当前节点更新为头节点
p.next = null; // help GC
failed = false;
return interrupted; //正常情况下死循环的唯一出口
}
// 将上一个节点设置为独占锁-1,条件返回fasle,第二次自旋进来
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
//阻塞当前线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们可以前驱节点pred的状态会进行不同处理
其实这个方法主要作用就是确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟只有确保能够被唤醒,当前线程才能放心的阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //状态为SIGNAL
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //状态为CANCELLED
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //状态为初始化状态(ReentrentLock语境下)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //计算待更新的state值
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //待更新的state值为0,说明持有锁的线程未重入,一旦释放锁其他线程将能获取
free = true;
setExclusiveOwnerThread(null); //清除锁的持有线程标记
}
setState(c); //更新state值
return free;
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章