并发编程之Condition解析
实现线程之间的通信,在Object类中提供等待/通知机制(并发编程之wait-notify解析)。而wait/notify有一个缺点如果有多个线程处于等待阻塞状态,单纯的使用notify是无法唤醒具体的线程的,只能任意唤醒一个或者使用notifyAll唤醒全部。再者,调用wait方法处于阻塞中的线程无法支持响应中断。
在Java并发包下,提供了一套新的基于等待/唤醒的API,即Condition。接下来我们对Condition相关源码做一个分析。
Condition是一个接口,其实现类有ConditionObject,在ConditionObject是一个AQS的内部类。其对应的方法有:
//阻塞等待
void await()
boolean await(long time, TimeUnit unit)
long awaitNanos(long nanosTimeout)
//通知唤醒
void signal()
void signalAll()
从其API中我们也可以看出在Object中的wait/notify类似。话不多说,开始进入源码环节。
入口的方法为await:
public final void await() throws InterruptedException {
// 线程阻塞的话 直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 当前需要阻塞的线程封装成Node插入到队列(等待队列)的尾部
Node node = addConditionWaiter();
// 释放所持有的锁资源
// 因为调用await之前,必须先获得锁,这里会释放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断节点是否在同步队列里面
//如果在同步队列里面说明等待节点被唤醒了
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试竞争资源获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
我们知道,调用await方法之前必须先获取到锁,在await方法中,会把当前线程封装为一个Node插入到队列的尾部,这里需要注意到的几点是:
此队列用于线程的等待,还不具体直接去竞争临界资源,这个队列我们以下称为等待队列
等待队列是单向的,跟AQS中的同步队列是双向的
等待队列无头结点,而同步队列最开始会初始化一个空的头结点
当前线程节点加入到队尾之后,需要释放锁资源fullyRelease,而这也是为什么在调用await的之前需要先获取锁。
然后判断当前节点是否在同步队列中isOnSyncQueue(被唤醒的节点会被加到同步队列中),如果当前节点在同步队列中发现了,说明已经被唤醒了。否则当前线程阻塞,等待被唤醒(被通知唤醒的逻辑接下来分析)。
当线程会唤醒之后,就会退出while语句,执行acquireQueued获取锁从而继续执行。
到这里,我们知道,一个等待的线程会被封装为Node,插入到等待队列的尾部(队列为单向链表),等待的被唤醒
那么接下来,我们来看看通知唤醒的逻辑,入口方法为signal,
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取等待队列的第一个节点唤醒
Node first = firstWaiter;
if (first != null)
// 唤醒操作
doSignal(first);
}
private void doSignal(Node first) {
do {
// 首节点是需要唤醒的,所以需要从队列中移除掉
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 节点由等待队列转移到同步队列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 插入到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
调用signal方法,首先会唤醒等待时间最长的,因为等待队列也是FIFO队列,所以首节点就是等待时间最长的节点,所以这里需要唤醒首节点。唤醒的节点会从等待队列中移除,然后把对应的节点从等待队列中加入到同步队列,可以看出,其实唤醒操作就是把等待队列中的节点加入到同步队列,因为只有同步队列中的节点才具有竞争临界资源的资格。
为了有助于理解,对于ConditionObject等待唤醒的逻辑,做出了以下的图:
线程尝试回去资源,获取到同步锁之后进入临界资源,然后调用await方法,以尾插法的形式进入等待队列,这里需要注意的是,等待队列是可以有多个的。当调用signal方法的时候,会把等待队列的首节点插入到同步队列的尾节点中,而等待节点中相关的节点会被删除。当调用signalAll方法的时候,会把单个的等待队列全部插入到同步队列中,在同步节点中的线程会尝试去竞争临界资源。
对Condition有了一个详细的认识后,我们来基于此实现一个等待队列
public class TBlockingQueue {
private String[] arr ;
int size ;
public int startIndex;
public int endIndex;
ReentrantLock rx = new ReentrantLock();
Condition fullCd = rx.newCondition();
Condition emptyCd = rx.newCondition();
public TBlockingQueue(int n){
if(n<=0) throw new RuntimeException("n must > 0");
arr = new String[n];
}
public void put(String data){
if(arr == null || data == null) throw new RuntimeException("must be init or check your data");
try {
rx.lock();
while (arr.length <= size) {
fullCd.await();
}
arr[endIndex] = data;
if(arr.length -1 == endIndex){
endIndex = 0 ;
} else {
endIndex++ ;
}
size ++ ;
emptyCd.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rx.unlock();
}
}
public String take(){
try {
rx.lock();
while (size == 0){
emptyCd.await();
}
String data = arr[startIndex];
arr[startIndex] = null;
if(arr.length -1 == startIndex){
endIndex = 0 ;
} else {
startIndex++ ;
}
size -- ;
fullCd.signalAll();
return data;
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
rx.unlock();
}
}
}
感谢你的阅读,有任何问题你可以在此公众号上与我交流,如果你觉得此文章对你有所收获的话,可以关注一波【南瓜小灯】嘛。学习的路上,期待着你我共同前行!!!