JUC并发编程之Semaphore源码详解

在前面分享的一篇文章中,我分享到了ReentrantLock源码,它是基于AQS进行实现的,那么今天本文分享的同样也是基于AQS实现的Semaphore工具类
Semaphore字面意思是信号量的意思,这种说法并不能够很好的描述它的作用,更通俗的来说应该是令牌管理器。官方文档说明Semaphore是一个计数信号量,从概念来讲Semaphore包含了一组许可证。每个用户都必须拿到许可证才能访问后续资源,没有拿到许可证的用于则需要进行等待获取许可证。
这么说会稍微有点不好理解,举个很通俗易懂例子:
这个案例的人就是线程,而正在购票的操作表示线程正在执行业务逻辑,离开窗口则表示线程执行完毕,看到窗口有人正在购票则排队等待则表示线程即将入队阻塞。

public class SemaphoreTest {public static void main(String[] args) {// 声明3个窗口Semaphore windows = new Semaphore(3, true);for (int i = 0; i < 10; i++) {new Thread(() -> {try {//占用窗口windows.acquire();System.out.println(Thread.currentThread().getName() + ":开始买票");//模拟买票流程Thread.sleep(3000);System.out.println(Thread.currentThread().getName() + ":购买成功");} catch (InterruptedException e) {e.printStackTrace();} finally {windows.release();}}).start();}}}
源码分析
// 默认构造方法,走非公平锁流程,permits:支持令牌数量public Semaphore(int permits) {sync = new NonfairSync(permits);}// 根据fair属性决定是否走公平还是非公平锁流程,permits:支持令牌数量public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
final int nonfairTryAcquireShared(int acquires) {for (;;) { // 自旋int available = getState(); //获得当前令牌数量int remaining = available - acquires; // 计算令牌数量if (remaining < 0 ||compareAndSetState(available, remaining)) // 还有令牌可用的话,则直接进行CAS获取令牌return remaining;}}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
进入到获取令牌失败的逻辑方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //获取当前节点的前驱节点if (p == head) {int r = tryAcquireShared(arg); // 尝试获取令牌if (r >= 0) { // 获取令牌成功setHeadAndPropagate(node, r); //传播链表p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收failed = false;return; // 获取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 阻塞当前线程throw new InterruptedException();}} finally {// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态if (failed)cancelAcquire(node);}}
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 封装节点// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 获取末尾节点if (pred != null) {node.prev = pred; // 当前节点的前驱引用指向为predif (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点pred.next = node; // 原末尾节点后驱引用指向为当前节点return node;}}enq(node);return node;}
基于FIFO入队流程图


private Node enq(final Node node) {for (;;) {Node t = tail; // 获取末尾节点if (t == null) { // Must initialize // 构建双向链表if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //获取当前节点的前驱节点if (p == head) {int r = tryAcquireShared(arg); // 尝试获取令牌if (r >= 0) { // 获取令牌成功setHeadAndPropagate(node, r); //传播链表p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收failed = false;return; // 获取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞parkAndCheckInterrupt()) // 阻塞当前线程throw new InterruptedException();}} finally {// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,* or was recorded (as h.waitStatus either before* or after setHead) by a previous operation* (note: this uses sign-check of waitStatus because* PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,* or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态(h = head) == null || h.waitStatus < 0) {Node s = node.next; // 获取当前下一节点if (s == null || s.isShared()) // 判断下节点是否为共享节点doReleaseShared(); // 传播~~ 具体传播什么呢???}}
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}

private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】Node h = head;if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点int ws = h.waitStatus; // 获取head的节点状态if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒continue; // loop to recheck casesunparkSuccessor(h); // 唤醒线程 去获取令牌}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去continue; // loop on failed CAS}if (h == head) // loop if head changedbreak; // 跳出当前循环}}

sync.releaseShared(1);public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { //通用释放令牌doReleaseShared(); //唤醒后驱节点return true;}return false;}
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】Node h = head;if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点int ws = h.waitStatus; // 获取head的节点状态if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒continue; // loop to recheck casesunparkSuccessor(h); // 唤醒线程 去获取令牌}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去continue; // loop on failed CAS}if (h == head) // loop if head changedbreak; // 跳出当前循环}}
private void unparkSuccessor(Node node) {// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现int ws = node.waitStatus;// 由于-1会小于0,所以更新改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取第一个正常排队的节点Node s = node.next;//正常解锁流程不会走该if判断if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒if (s != null)LockSupport.unpark(s.thread);}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
那么到此Semaphore公平与非公平获取令牌的与释放令牌的流程,到此就结束啦
最后方法Semaphore源码详解视频,可结合博文一起观看
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章


评论
