Condition实现原理

共 13974字,需浏览 28分钟

 ·

2021-07-28 08:43

 作者 |  生活咖啡

来源 |  cnblogs.com/vielat/p/15022895.html

Condition接口提供了与Object阻塞(wait())与唤醒(notify()notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition

一、基本使用

基于Condition实现生产者、消费者模式。代码基本与Object#wait()Object#notify()类似,只不过我们使用Lock替换了synchronized关键字。
生产者

public class Producer implements Runnable {
    private Lock lock;
    private Condition condition;
    private Queue<String> queue;
    private int maxSize;

    public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
        this.lock = lock;
        this.condition = condition;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        int i = 0;
        for (; ; ) {
            lock.lock();
            // 如果满了,则阻塞
            while (queue.size() == maxSize) {
                System.out.println("生产者队列满了,等待...");
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            queue.add("一个消息:" + ++i);
            System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i);
            condition.signal();
            lock.unlock();
        }
    }
}

消费者

public class Consumer implements Runnable {
    private Lock lock;
    private Condition condition;
    private Queue<String> queue;
    private int maxSize;

    public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
        this.lock = lock;
        this.condition = condition;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        for (; ; ) {
            lock.lock();
            while (queue.isEmpty()) {
                System.out.println("消费者队列为空,等待...");
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String obj = queue.remove();
            System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj);
            condition.signal();
            lock.unlock();
        }
    }
}

测试类

public class ConditionProducerConsumer {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Queue<String> queue = new LinkedBlockingQueue<>();
        int maxSize = 10;

        Producer producer = new Producer(lock, condition, queue, maxSize);
        Consumer consumer = new Consumer(lock, condition, queue, maxSize);

        new Thread(producer).start();
        new Thread(consumer).start();

    }
}

二、源码分析

上述示例中使用的LockReentrantLock,关于它的lock方法与unlock方法的原理详见ReentrantLock实现原理。上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:

public class ReentrantLock implements Lock, java.io.Serializable {
 ...
 public Condition newCondition() {
        return sync.newCondition();
    }
 
 abstract static class Sync extends AbstractQueuedSynchronizer {
  ...
  final ConditionObject newCondition() {
            return new ConditionObject();
        }
  ...
 }
 ...
}

上述的ConditionObject定义在AQS中,如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
 ...
 public class ConditionObject implements Condition, java.io.Serializable {
  ...
 }
 ...
}

首先来分析下Condition#await()方法

public final void await() throws InterruptedException {
 if (Thread.interrupted())
  throw new InterruptedException();
 Node node = addConditionWaiter();
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 while (!isOnSyncQueue(node)) {
  LockSupport.park(this);
  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
   break;
 }
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  interruptMode = REINTERRUPT;
 if (node.nextWaiter != null) // clean up if cancelled
  unlinkCancelledWaiters();
 if (interruptMode != 0)
  reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
 Node t = lastWaiter;
 // If lastWaiter is cancelled, clean out.
 if (t != null && t.waitStatus != Node.CONDITION) {
  unlinkCancelledWaiters();
  t = lastWaiter;
 }
 Node node = new Node(Thread.currentThread(), Node.CONDITION);
 if (t == null)
  firstWaiter = node;
 else
  t.nextWaiter = node;
 lastWaiter = node;
 return node;
}

根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:

假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行
Condition#await()方法,则会将当前线程添加至Condition队列中,如下:

然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:

在调用isOnSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。

假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:

public final void signal() {
 if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
 Node first = firstWaiter;
 if (first != null)
  doSignal(first);
}

private void doSignal(Node first) {
       do {
           if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
           first.nextWaiter = null;
           } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
}

执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:

至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上操作。

本文详细介绍了单个Condition队列的执行流程,其实一个Lock中可以有多个Condition队列,比如:JUC中提供的LinkedBlockingDequeArrayBlockingQueue


最近给大家找了  通用权限系统

资源,怎么领取?


扫二维码,加我微信,回复:通用权限系统

 注意,不要乱回复 

没错,不是机器人
记得一定要等待,等待才有好东西
浏览 42
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报