JUC并发编程之Semaphore源码详解
在前面分享的一篇文章中,我分享到了ReentrantLock源码,它是基于AQS进行实现的,那么今天本文分享的同样也是基于AQS实现的Semaphore工具类
Semaphore字面意思是信号量的意思,这种说法并不能够很好的描述它的作用,更通俗的来说应该是令牌管理器。官方文档说明Semaphore是一个计数信号量,从概念来讲Semaphore包含了一组许可证。每个用户都必须拿到许可证才能访问后续资源,没有拿到许可证的用于则需要进行等待获取许可证。
这么说会稍微有点不好理解,举个很通俗易懂例子:
这个案例的人就是线程,而正在购票的操作表示线程正在执行业务逻辑,离开窗口则表示线程执行完毕,看到窗口有人正在购票则排队等待则表示线程即将入队阻塞。
public class SemaphoreTest {
public static void main(String[] args) {
// 声明3个窗口
Semaphore windows = new Semaphore(3, true);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
//占用窗口
windows.acquire();
System.out.println(Thread.currentThread().getName() + ":开始买票");
//模拟买票流程
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":购买成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
}
}).start();
}
}
}
源码分析
// 默认构造方法,走非公平锁流程,permits:支持令牌数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 根据fair属性决定是否走公平还是非公平锁流程,permits:支持令牌数量
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋
int available = getState(); //获得当前令牌数量
int remaining = available - acquires; // 计算令牌数量
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 还有令牌可用的话,则直接进行CAS获取令牌
return remaining;
}
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
进入到获取令牌失败的逻辑方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 获取末尾节点
if (pred != null) {
node.prev = pred; // 当前节点的前驱引用指向为pred
if (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点
pred.next = node; // 原末尾节点后驱引用指向为当前节点
return node;
}
}
enq(node);
return node;
}
基于FIFO入队流程图
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取末尾节点
if (t == null) { // Must initialize // 构建双向链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取当前下一节点
if (s == null || s.isShared()) // 判断下节点是否为共享节点
doReleaseShared(); // 传播~~ 具体传播什么呢???
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
sync.releaseShared(1);
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //通用释放令牌
doReleaseShared(); //唤醒后驱节点
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
那么到此Semaphore公平与非公平获取令牌的与释放令牌的流程,到此就结束啦
最后方法Semaphore源码详解视频,可结合博文一起观看
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章
评论