并发编程之Condition解析
实现线程之间的通信,在Object类中提供等待/通知机制(并发编程之wait-notify解析)。而wait/notify有一个缺点如果有多个线程处于等待阻塞状态,单纯的使用notify是无法唤醒具体的线程的,只能任意唤醒一个或者使用notifyAll唤醒全部。再者,调用wait方法处于阻塞中的线程无法支持响应中断。
在Java并发包下,提供了一套新的基于等待/唤醒的API,即Condition。接下来我们对Condition相关源码做一个分析。

Condition是一个接口,其实现类有ConditionObject,在ConditionObject是一个AQS的内部类。其对应的方法有:
//阻塞等待void await()boolean await(long time, TimeUnit unit)long awaitNanos(long nanosTimeout)//通知唤醒void signal()void signalAll()
从其API中我们也可以看出在Object中的wait/notify类似。话不多说,开始进入源码环节。

入口的方法为await:
public final void await() throws InterruptedException {// 线程阻塞的话 直接抛出异常if (Thread.interrupted())throw new InterruptedException();// 当前需要阻塞的线程封装成Node插入到队列(等待队列)的尾部Node node = addConditionWaiter();// 释放所持有的锁资源// 因为调用await之前,必须先获得锁,这里会释放掉int savedState = fullyRelease(node);int interruptMode = 0;//判断节点是否在同步队列里面//如果在同步队列里面说明等待节点被唤醒了while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 尝试竞争资源获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
我们知道,调用await方法之前必须先获取到锁,在await方法中,会把当前线程封装为一个Node插入到队列的尾部,这里需要注意到的几点是:
此队列用于线程的等待,还不具体直接去竞争临界资源,这个队列我们以下称为等待队列
等待队列是单向的,跟AQS中的同步队列是双向的
等待队列无头结点,而同步队列最开始会初始化一个空的头结点
当前线程节点加入到队尾之后,需要释放锁资源fullyRelease,而这也是为什么在调用await的之前需要先获取锁。
然后判断当前节点是否在同步队列中isOnSyncQueue(被唤醒的节点会被加到同步队列中),如果当前节点在同步队列中发现了,说明已经被唤醒了。否则当前线程阻塞,等待被唤醒(被通知唤醒的逻辑接下来分析)。
当线程会唤醒之后,就会退出while语句,执行acquireQueued获取锁从而继续执行。
到这里,我们知道,一个等待的线程会被封装为Node,插入到等待队列的尾部(队列为单向链表),等待的被唤醒

那么接下来,我们来看看通知唤醒的逻辑,入口方法为signal,
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();// 获取等待队列的第一个节点唤醒Node first = firstWaiter;if (first != null)// 唤醒操作doSignal(first);}private void doSignal(Node first) {do {// 首节点是需要唤醒的,所以需要从队列中移除掉if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 节点由等待队列转移到同步队列中} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {// 插入到同步队列中Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
调用signal方法,首先会唤醒等待时间最长的,因为等待队列也是FIFO队列,所以首节点就是等待时间最长的节点,所以这里需要唤醒首节点。唤醒的节点会从等待队列中移除,然后把对应的节点从等待队列中加入到同步队列,可以看出,其实唤醒操作就是把等待队列中的节点加入到同步队列,因为只有同步队列中的节点才具有竞争临界资源的资格。

为了有助于理解,对于ConditionObject等待唤醒的逻辑,做出了以下的图:

线程尝试回去资源,获取到同步锁之后进入临界资源,然后调用await方法,以尾插法的形式进入等待队列,这里需要注意的是,等待队列是可以有多个的。当调用signal方法的时候,会把等待队列的首节点插入到同步队列的尾节点中,而等待节点中相关的节点会被删除。当调用signalAll方法的时候,会把单个的等待队列全部插入到同步队列中,在同步节点中的线程会尝试去竞争临界资源。

对Condition有了一个详细的认识后,我们来基于此实现一个等待队列
public class TBlockingQueue {private String[] arr ;int size ;public int startIndex;public int endIndex;ReentrantLock rx = new ReentrantLock();Condition fullCd = rx.newCondition();Condition emptyCd = rx.newCondition();public TBlockingQueue(int n){if(n<=0) throw new RuntimeException("n must > 0");arr = new String[n];}public void put(String data){if(arr == null || data == null) throw new RuntimeException("must be init or check your data");try {rx.lock();while (arr.length <= size) {fullCd.await();}arr[endIndex] = data;if(arr.length -1 == endIndex){endIndex = 0 ;} else {endIndex++ ;}size ++ ;emptyCd.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {rx.unlock();}}public String take(){try {rx.lock();while (size == 0){emptyCd.await();}String data = arr[startIndex];arr[startIndex] = null;if(arr.length -1 == startIndex){endIndex = 0 ;} else {startIndex++ ;}size -- ;fullCd.signalAll();return data;} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);} finally {rx.unlock();}}}
感谢你的阅读,有任何问题你可以在此公众号上与我交流,如果你觉得此文章对你有所收获的话,可以关注一波【南瓜小灯】嘛。学习的路上,期待着你我共同前行!!!
