Java ReentrantLock 源码解析与简单使用
共 31476字,需浏览 63分钟
·
2021-04-25 10:03
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
ReentrantLock是juc中的一个可重入锁独占锁,该锁分为公平锁和非公平锁。独占锁是这个锁一旦被一个线程占据其他线程就不能抢占,可重入锁是如果这个线程重复获取这个锁不需要进行等待,直接进入。公平锁和非公平锁是刚进入的线程是否可以和在队列中等待的第一个线程进行竞争,如果是非公平锁则刚进入的线程在当前锁空闲的时候可以和等待队列中的第一个线程进行竞争,如果是公平锁则不能,刚进入的线程必须进入队列等待。
一、java.util.concurrent.locks.Lock接口
public class ReentrantLock implements Lock, java.io.Serializable {
.....
}
从源码中我们可以看到ReentrantLock 实现了Lock接口。Lock接口是juc中定义的一个锁的接口。
public interface Lock {
// 获取锁
void lock();
// 获取锁的过程能响应中断
void lockInterruptibly() throws InterruptedException;
// 非阻塞式响应中断能立即返回,获取锁返回true反之返回false
boolean tryLock();
// 超时获取锁,在超时内或者未中断的情况下能获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
// 获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,当再次获取锁时才能从等待中返回
Condition newCondition();
}
进行加锁主要有lock()、tryLock()、lockInterruptibly()这三个方法。tryLock()方法只是尝试进行获取当前锁,获取到了则返回true,获取不到返回false,不会直接让线程加入到等待队列中进行阻塞等待。lock()方法则会直接让线程阻塞等待,使用unlock才能唤醒线程。lockInterruptibly()和lock()方法主要的区别在于对异常的处理。
二、公平锁介绍
从源码中我们可以看出ReentrantLock使用了适配器设计模式,ReentrantLock继承了lock方法,同时拥有Sync类,Lock的方法都是大部分调用Sync类实现的,Sync类继承了AbstractQueuedSynchronizer类,该类是队列同步器会将等待的线程加入到队列的末端等待唤醒。通过公平锁的例子,讲解一下lock几个主要的方法。
// ReentrantLock的lock方法调用了sync的lock方法
public void lock() {
sync.lock();
}
// 公平锁类
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// ReentrantLock公平锁模式下lock调用的方法
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前调用获取锁方法的线程
final Thread current = Thread.currentThread();
// 获取当前锁的状态,如果c=0表示还没有被解锁,可以进行竞争获取锁
int c = getState();
// 表明重入锁没有被抢占
if (c == 0) {
// 等待对象中没有其他需要处理的 就cas将锁的状态设置为acquires。公平锁与非公平锁主要的区别在于是否是要要让队列等待的先进行执行。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 把独占线程设置为自己
setExclusiveOwnerThread(current);
return true;
}
}
// 走到这里表明锁被锁住了
// 如果锁的独占线程是自己,则直接重入
else if (current == getExclusiveOwnerThread()) {
// 记录锁被重入的次数,在释放锁的时候判断是否满足释放条件
int nextc = c + acquires;
// 出现异常
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因为当前锁被独占了所有其他线程是无法获取这个锁的,state的状态只能被自己修改,所以不需要处理并发的情况。
setState(nextc);
// 获取锁成功
return true;
}
// 获取锁失败
return false;
}
}
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
// 同步状态不允许获取
// 如果不在队列,当前线程放入队列
// 可能会阻塞当前前程
// 如果当前线程在队列,将其移出队列
public final void acquire(int arg) {
// 先尝试tryAcquire方法获取锁,如果失败了不进行阻塞直接返回,调用acquireQueued将当前线程封装成一个节点并进行阻塞,加入到等待队列中。
// Node.EXCLUSIVE
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 将当前的线程封装成一个Node,在ReentrantLock锁是独占模式,所以是Node.EXCLUSIVE。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取等待队列中的最后一个节点
Node pred = tail;
if (pred != null) {
// 走到这里表明tail节点存在
// 将当前node的prev节点设置为tail节点
node.prev = pred;
// cas将tail节点设置为新的节点,如果设置成功表明tail节点没有被修改过,如果失败表明其他线程提前修改的tail节点,出现了冲突。
if (compareAndSetTail(pred, node)) {
// 如果没有出现冲突,表明当前节点添加节点到末尾成功
pred.next = node;
return node;
}
}
// tail节点被其他线程修改时走到这里
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 当tail节点不存在时或者tail节点cas替换失败时调用这个方法
private Node enq(final Node node) {
for (;;) {
// 获取队列的末尾节点
Node t = tail;
// 如果tail节点不存在表明这个队列还没有进行初始化或者已经被清空了
if (t == null) { // Must initialize
// cas初始化了一个队列,防止多个线程进行初始化出现冲突
if (compareAndSetHead(new Node()))
// 初始化队列
tail = head;
} else {
// 如果队列存在则不断调用这个方法进行cas替换,知道成功插入到队列末尾。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
// cas抢占锁的时候失败,将线程插入到等待队列中
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点就是head,表明当前线程可以尝试竞争锁
if (p == head && tryAcquire(arg)) {
// 走到这里表明当前node竞争到了锁
// 将当前节点设置为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是head,或者竞争锁失败了,调用shouldParkAfterFailedAcquire让前一个节点完事儿了把自己唤醒,同时进入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个node的waitStatus
int ws = pred.waitStatus;
// 如果已经是SIGNAL表明前一个节点一直知道要通知自己了,可以放心进行睡眠等待了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 走到这里表明前面一个节点不为SIGNAL
// 下面的方法是寻找一个非cancel节点,把它设置为signal用来唤醒自己
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 把前一个节点设置为signal用来唤醒自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
// 调用LockSupprt方法将当前线程进行阻塞
LockSupport.park(this);
return Thread.interrupted();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 更新同步状态
// 同步状态是否允许被阻塞的线程获取
public final boolean release(int arg) {
// 1. 释放锁
if (tryRelease(arg)) {
// 2. 如果独占锁释放"完全",唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果重入次数为零表示可以进行释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 获取最后一个没有被cancel的节点
s = t;
}
if (s != null)
// 调用LockSupport.unpark释放下一个被阻塞且没有被取消的线程来竞争锁
LockSupport.unpark(s.thread);
}
简单使用
public class Main {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread thread1 = new Thread(()->{
System.out.println("thread1 wait lock");
lock.lock();
System.out.println("thread1 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread1 unlock");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 wait lock");
lock.lock();
System.out.println("thread2 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread2 unlock");
});
thread1.start();
thread2.start();
}
}
————————————————
版权声明:本文为CSDN博主「zhanghl111」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:
https://blog.csdn.net/zhlily1/article/details/115794503
粉丝福利:Java从入门到入土学习路线图
👇👇👇
👆长按上方微信二维码 2 秒
感谢点赞支持下哈