并发编程之Condition解析

南瓜小灯

共 4128字,需浏览 9分钟

 · 2021-04-04

实现线程之间的通信,在Object类中提供等待/通知机制(并发编程之wait-notify解析)。而wait/notify有一个缺点如果有多个线程处于等待阻塞状态,单纯的使用notify是无法唤醒具体的线程的,只能任意唤醒一个或者使用notifyAll唤醒全部。再者,调用wait方法处于阻塞中的线程无法支持响应中断。


在Java并发包下,提供了一套新的基于等待/唤醒的API,即Condition。接下来我们对Condition相关源码做一个分析。


022b338f36dd00fdb66490a7a7e094b7.webp



Condition是一个接口,其实现类有ConditionObject,在ConditionObject是一个AQS的内部类。其对应的方法有:

//阻塞等待void await()boolean await(long time, TimeUnit unit)long awaitNanos(long nanosTimeout)
//通知唤醒void signal()void signalAll()

从其API中我们也可以看出在Object中的wait/notify类似。话不多说,开始进入源码环节。


653a5f914c7d6945f47de6b6d0654fd3.webp



入口的方法为await:

public final void await() throws InterruptedException {    // 线程阻塞的话 直接抛出异常  if (Thread.interrupted())    throw new InterruptedException();      // 当前需要阻塞的线程封装成Node插入到队列(等待队列)的尾部  Node node = addConditionWaiter();  // 释放所持有的锁资源  // 因为调用await之前,必须先获得锁,这里会释放掉  int savedState = fullyRelease(node);  int interruptMode = 0;  //判断节点是否在同步队列里面  //如果在同步队列里面说明等待节点被唤醒了  while (!isOnSyncQueue(node)) {    LockSupport.park(this);    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)      break;  }  // 尝试竞争资源获取锁  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)    interruptMode = REINTERRUPT;  if (node.nextWaiter != null) // clean up if cancelled    unlinkCancelledWaiters();  if (interruptMode != 0)    reportInterruptAfterWait(interruptMode);}


我们知道,调用await方法之前必须先获取到锁,在await方法中,会把当前线程封装为一个Node插入到队列的尾部,这里需要注意到的几点是:


  • 此队列用于线程的等待,还不具体直接去竞争临界资源,这个队列我们以下称为等待队列

  • 等待队列是单向的,跟AQS中的同步队列是双向的

  • 等待队列无头结点,而同步队列最开始会初始化一个空的头结点


当前线程节点加入到队尾之后,需要释放锁资源fullyRelease,而这也是为什么在调用await的之前需要先获取锁。


然后判断当前节点是否在同步队列中isOnSyncQueue(被唤醒的节点会被加到同步队列中),如果当前节点在同步队列中发现了,说明已经被唤醒了。否则当前线程阻塞,等待被唤醒(被通知唤醒的逻辑接下来分析)。


当线程会唤醒之后,就会退出while语句,执行acquireQueued获取锁从而继续执行。


到这里,我们知道,一个等待的线程会被封装为Node,插入到等待队列的尾部(队列为单向链表),等待的被唤醒


022b338f36dd00fdb66490a7a7e094b7.webp



那么接下来,我们来看看通知唤醒的逻辑,入口方法为signal,

public final void signal() {  if (!isHeldExclusively())    throw new IllegalMonitorStateException();  // 获取等待队列的第一个节点唤醒    Node first = firstWaiter;  if (first != null)      // 唤醒操作    doSignal(first);}
private void doSignal(Node first) { do { // 首节点是需要唤醒的,所以需要从队列中移除掉 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 节点由等待队列转移到同步队列中 } while (!transferForSignal(first) && (first = firstWaiter) != null);}
final boolean transferForSignal(Node node) { // 插入到同步队列中 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}

调用signal方法,首先会唤醒等待时间最长的,因为等待队列也是FIFO队列,所以首节点就是等待时间最长的节点,所以这里需要唤醒首节点。唤醒的节点会从等待队列中移除,然后把对应的节点从等待队列中加入到同步队列,可以看出,其实唤醒操作就是把等待队列中的节点加入到同步队列,因为只有同步队列中的节点才具有竞争临界资源的资格。



653a5f914c7d6945f47de6b6d0654fd3.webp



为了有助于理解,对于ConditionObject等待唤醒的逻辑,做出了以下的图:


7caa68d1d69959c25f7a1698ba51ba76.webp


线程尝试回去资源,获取到同步锁之后进入临界资源,然后调用await方法,以尾插法的形式进入等待队列,这里需要注意的是,等待队列是可以有多个的。当调用signal方法的时候,会把等待队列的首节点插入到同步队列的尾节点中,而等待节点中相关的节点会被删除。当调用signalAll方法的时候,会把单个的等待队列全部插入到同步队列中,在同步节点中的线程会尝试去竞争临界资源。


022b338f36dd00fdb66490a7a7e094b7.webp



对Condition有了一个详细的认识后,我们来基于此实现一个等待队列

public class TBlockingQueue {
private String[] arr ;int size ;public int startIndex;public int endIndex;
ReentrantLock rx = new ReentrantLock();Condition fullCd = rx.newCondition();Condition emptyCd = rx.newCondition();
public TBlockingQueue(int n){ if(n<=0) throw new RuntimeException("n must > 0"); arr = new String[n];}
public void put(String data){ if(arr == null || data == null) throw new RuntimeException("must be init or check your data"); try { rx.lock(); while (arr.length <= size) { fullCd.await(); }
arr[endIndex] = data; if(arr.length -1 == endIndex){ endIndex = 0 ; } else { endIndex++ ; } size ++ ; emptyCd.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { rx.unlock(); }}

public String take(){ try { rx.lock();
while (size == 0){ emptyCd.await(); }
String data = arr[startIndex]; arr[startIndex] = null; if(arr.length -1 == startIndex){ endIndex = 0 ; } else { startIndex++ ; } size -- ; fullCd.signalAll(); return data; } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } finally { rx.unlock(); }}}

感谢你的阅读,有任何问题你可以在此公众号上与我交流,如果你觉得此文章对你有所收获的话,可以关注一波【南瓜小灯】嘛。学习的路上,期待着你我共同前行!!!

浏览 31
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报