同步锁基本原理与实现
为充分利用机器性能,人们发明了多线程。但同时带来了线程安全问题,于是人们又发明了同步锁。
这个问题自然人人知道,但你真的了解同步锁吗?还是说你会用其中的上锁与解锁功能?
今天我们就一起来深入看同步锁的原理和实现吧!
一:同步锁的职责
同步锁的职责可以说就一个,限制资源的使用(线程安全从属)。
它一般至少会包含两个功能: 1. 给资源加锁;2. 给资源解锁;另外,它一般还有 等待/通知 即 wait/notify 的功能;
同步锁的应用场景:多个线程同时操作一个事务必须保证正确性;一个资源只能同时由一线程访问操作;一个资源最多只能接入k的并发访问;保证访问的顺序性;
同步锁的实现方式:操作系统调度实现;应用自行实现;CAS自旋;
同步锁的几个问题:
为什么它能保证线程安全?
锁等待耗CPU吗?
使用锁后性能下降严重的原因是啥?
二:同步锁的实现一:lock/unlock
其实对于应用层来说,非常多就是 lock/unlock , 这也是锁的核心。
AQS 是java中很多锁实现的基础,因为它屏蔽了很多繁杂而底层的阻塞操作,为上层抽象出易用的接口。
我们就以AQS作为跳板,先来看一下上锁的过程。为不至于陷入具体锁的业务逻辑中,我们先以最简单的 CountDownLatch 看看。
// 先看看 CountDownLatch 的基础数据结构,可以说是不能再简单了,就继承了 AQS,然后简单覆写了几个必要方法。// java.util.concurrent.CountDownLatch.Sync/*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {// 只有一种情况会获取锁成功,即 state == 0 的时候return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;// 原始的锁数量是在初始化时指定的不可变的,每次释放一个锁标识int nextc = c-1;if (compareAndSetState(c, nextc))// 只有一情况会释放锁成功,即本次释放后 state == 0return nextc == 0;}}}private final Sync sync;
重点1,我们看看上锁过程,即 await() 的调用。
public void await() throws InterruptedException {// 调用 AQS 的接口,由AQS实现了锁的骨架逻辑sync.acquireSharedInterruptibly(1);}// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly/*** Acquires in shared mode, aborting if interrupted. Implemented* by first checking interrupt status, then invoking at least once* {@link #tryAcquireShared}, returning on success. Otherwise the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted.* @param arg the acquire argument.* This value is conveyed to {@link #tryAcquireShared} but is* otherwise uninterpreted and can represent anything* you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 首先尝试获取锁,如果成功就不用阻塞了// 而从上面的逻辑我们看到,获取锁相当之简单,所以,获取锁本身并没有太多的性能消耗哟// 如果获取锁失败,则会进行稍后尝试,这应该是复杂而精巧的if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}/*** Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 首先将当前线程添加排队队尾,此处会保证线程安全,稍后我们可以看到final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 获取其上一节点,如果上一节点是头节点,就代表当前线程可以再次尝试获取锁了final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 先检测是否需要阻塞,然后再进行阻塞等待,阻塞由 LockSupport 底层支持// 如果阻塞后,将不会主动唤醒,只会由 unlock 时,主动被通知// 因此,此处即是获取锁的最终等待点// 操作系统将不会再次调度到本线程,直到获取到锁if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}// 如此线程安全地添加当前线程到队尾?CAS 保证/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// 检测是否需要进行阻塞/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/// 只有前置节点是 SIGNAL 状态的节点,才需要进行 阻塞等待,当然前置节点会在下一次循环中被设置好return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// park 阻塞实现/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {// 将当前 AQS 实例作为锁对象 blocker, 进行操作系统调用阻塞, 所以所有等待锁的线程将会在同一个锁前提下执行LockSupport.park(this);return Thread.interrupted();}
如上,上锁过程是比较简单明了的。加入一队列,然后由操作系统将线程调出。(那么操作系统是如何把线程调出的呢?有兴趣自行研究)
重点2. 解锁过程,即 countDown() 调用
public void countDown() {// 同样直接调用 AQS 的接口,由AQS实现了锁的释放骨架逻辑sync.releaseShared(1);}// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 调用业务实现的释放逻辑,如果成功,再执行底层的释放,如队列移除,线程通知等等// 在 CountDownLatch 的实现中,只有 state == 0 时才会成功,所以它只会执行一次底层释放// 这也是我们认为 CountDownLatch 能够做到多线程同时执行的效果的原因之一if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}/*** Release action for shared mode -- signals successor and ensures* propagation. (Note: For exclusive mode, release just amounts* to calling unparkSuccessor of head if it needs signal.)*/private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;// 队列不为空才进行释放if (h != null && h != tail) {int ws = h.waitStatus;// 看过上面的 lock 逻辑,我们知道只要在阻塞状态,一定是 Node.SIGNALif (ws == Node.SIGNAL) {// 状态改变成功,才进行后续的唤醒逻辑// 因为先改变状态成功,才算是线程安全的,再进行唤醒,否则进入下一次循环再检查if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 将头节点的下一节点唤醒,如有必要unparkSuccessor(h);}// 这里的 propagates, 是要传播啥呢??// 为什么只唤醒了一个线程,其他线程也可以动了?else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 唤醒下一个节点// 但如果下一节点已经取消等待了,那么就找下一个没最近的没被取消的线程进行唤醒// 唤醒只是针对一个线程的哟Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
重要3. 线程解锁的传播性?
因为从上一节的讲解中,我们看到,当用户调用 countDown 时,仅仅是让操作系统唤醒了 head 的下一个节点线程或者最近未取消的节点。那么,从哪里来的所有线程都获取了锁从而运行呢?
其实是在 获取锁的过程中,还有一点我们未看清:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared/*** Acquires in shared uninterruptible mode.* @param arg the acquire argument*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 当countDown被调用后,head节点被唤醒,执行int r = tryAcquireShared(arg);if (r >= 0) {// 获取到锁后,设置node为下一个头节点,并把唤醒状态传播下去,而这里面肯定会做一些唤醒其他线程的操作,请看下文setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Sets head of queue, and checks if successor may be waiting* in shared mode, if so propagating if either propagate > 0 or* PROPAGATE status was set.** @param node the node* @param propagate the return value from a tryAcquireShared*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,* or was recorded (as h.waitStatus either before* or after setHead) by a previous operation* (note: this uses sign-check of waitStatus because* PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,* or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 如果有必要,则做一次唤醒下一线程的操作// 在 countDown() 不会触发此操作,所以这里只是一个内部调用传播Node s = node.next;if (s == null || s.isShared())// 此处锁释放逻辑如上,总之,又是另一次的唤醒触发doReleaseShared();}}
到此,我们明白了它是怎么做到一个锁释放,所有线程可通行的。也从根本上回答了我们猜想,所有线程同时并发运行。然而并没有,它只是通过唤醒传播性来依次唤醒各个等待线程的。从绝对时间性上来讲,都是有先后关系的。以后可别再浅显说是同时执行了哟。
三、 锁的切换:wait/notify
上面看出,针对一个lock/unlock 的过程还是很简单的,由操作系统负责大头,实现代码也并不多。
但是针对稍微有点要求的场景,就会进行条件式的操作。比如:持有某个锁运行一段代码,但是,运行时发现某条件不满足,需要进行等待而不能直接结束,直到条件成立。即所谓的 wait 操作。
乍一看,wait/notify 与 lock/unlock 很像,其实不然。区分主要是 lock/unlock 是针对整个代码段的,而 wait/notify 则是针对某个条件的,即获取了锁不代表条件成立了,但是条件成立了一定要在锁的前提下才能进行安全操作。
那么,是否 wait/notify 也一样的实现简单呢?比如java的最基础类 Object 类就提供了 wait/notify 功能。
我们既然想一探究竟,还是以并发包下的实现作为基础吧,毕竟 java 才是我们的强项。
本次,咱们以 ArrayBlockingQueue#put/take 作为基础看下这种场景的使用先。
ArrayBlockingQueue 的put/take 特性就是,put当队列满时,一直阻塞,直到有可用位置才继续运行下一步。而take当队列为空时一样阻塞,直到队列里有数据才运行下一步。这种场景使用锁主不好搞了,因为这是一个条件判断。put/take 如下:
// java.util.concurrent.ArrayBlockingQueue#put/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 当队列满时,一直等待while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}// java.util.concurrent.ArrayBlockingQueue#takepublic E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 当队列为空时一直等待while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
看起来相当简单,完全符合人类思维。只是,这里使用的两个变量进行控制流程 notFull,notEmpty. 这两个变量是如何进行关联的呢?
在这之前,我们还需要补充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何时才能运行呢?如上代码在各自的入队和出队完成之后进行通知就可以了。
// 与 put 对应,入队完成后,队列自然就不为空了,通知下 notEmpty 就好了/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;// 我已放入一个元素,不为空了notEmpty.signal();}// 与 take 对应,出队完成后,自然就不可能是满的了,至少一个空余空间。/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 我已移除一个元素,肯定没有满了,你们继续放入吧notFull.signal();return x;}
是不是超级好理解。是的。不过,我们不是想看 ArrayBlockingQueue 是如何实现的,我们是要论清 wait/notify 是如何实现的。因为毕竟,他们不是一个锁那么简单。
// 三个锁的关系,即 notEmpty, notFull 都是 ReentrantLock 的条件锁,相当于是其子集吧/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}// lock.newCondition() 是什么鬼?它是 AQS 中实现的 ConditionObject// java.util.concurrent.locks.ReentrantLock#newConditionpublic Condition newCondition() {return sync.newCondition();}// java.util.concurrent.locks.ReentrantLock.Sync#newConditionfinal ConditionObject newCondition() {// AQS 中定义return new ConditionObject();}
接下来,我们要带着几个疑问来看这个 Condition 的对象:
    1. 它的 wait/notify 是如何实现的?
    2. 它是如何与互相进行联系的?
    3. 为什么 wait/notify 必须要在外面的lock获取之后才能执行?
    4. 它与Object的wait/notify 有什么相同和不同点?
能够回答了上面的问题,基本上对其原理与实现也就理解得差不多了。
重点1. wait/notify 是如何实现的?
我们从上面可以看到,它是通过调用 await()/signal() 实现的,到底做事如何,且看下面。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加当前线程到 等待线程队列中,有 lastWaiter/firstWaiter 维护Node node = addConditionWaiter();// 释放当前lock中持有的锁,详情且看下文int savedState = fullyRelease(node);// 从以下开始,将不再保证线程安全性,因为当前的锁已经释放,其他线程将会重新竞争锁使用int interruptMode = 0;// 循环判定,如果当前节点不在 sync 同步队列中,那么就反复阻塞自己// 所以判断是否在 同步队列上,是很重要的while (!isOnSyncQueue(node)) {// 没有在同步队列,阻塞LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 当条件被满足后,需要重新竞争锁,详情看下文// 竞争到锁后,原样返回到 wait 的原点,继续执行业务逻辑if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 下面是异常处理,忽略if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}/*** Invokes release with current state value; returns saved state.* Cancels node and throws exception on failure.* @param node the condition node for this wait* @return previous sync state*/final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 预期的,都是释放锁成功,如果失败,说明当前线程并并未获取到锁,引发异常if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {// tryRelease 由客户端自定义实现if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// 如何判定当前线程是否在同步队列中或者可以进行同步队列?/*** Returns true if a node, always one that was initially placed on* a condition queue, is now waiting to reacquire on sync queue.* @param node the node* @return true if is reacquiring*/final boolean isOnSyncQueue(Node node) {// 如果上一节点还没有被移除,当前节点就不能被加入到同步队列if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果当前节点的下游节点已经存在,则它自身必定已经被移到同步队列中if (node.next != null) // If has successor, it must be on queuereturn true;/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it. It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/// 最终直接从同步队列中查找,如果找到,则自身已经在同步队列中return findNodeFromTail(node);}/*** Returns true if node is on sync queue by searching backwards from tail.* Called only when needed by isOnSyncQueue.* @return true if present*/private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}// 当条件被满足后,需要重新竞争锁,以保证外部的锁语义,因为之前自己已经将锁主动释放// 这个锁与 lock/unlock 时的一毛一样,没啥可讲的// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
总结一下 wait 的逻辑:
1. 前提:自身已获取到外部锁;
2. 将当前线程添加到 ConditionQueue 等待队列中;
3. 释放已获取到的锁;
4. 反复检查进入等待,直到当前节点被移动到同步队列中;
5. 条件满足被唤醒,重新竞争外部锁,成功则返回,否则继续阻塞;(外部锁是同一个,这也是要求两个对象必须存在依赖关系的原因)
6. wait前线程持有锁,wait后线程持有锁,没有一点外部锁变化;
重点2. 厘清了 wait, 接下来,我们看 signal() 通知唤醒的实现:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signal() {// 只有获取锁的实例,才可以进行signal,否则你拿什么去保证线程安全呢if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;// 通知 firstWaiterif (first != null)doSignal(first);}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {// 最多只转移一个 节点do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// 将一个节点从 等待队列 移动到 同步队列中,即可参与下一轮竞争// 只有确实移动成功才会返回 true// 说明:当前线程是持有锁的线程// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 同步队列由 head/tail 指针维护Node p = enq(node);int ws = p.waitStatus;// 注意,此处正常情况下并不会唤醒等待线程,仅是将队列转移。// 因为当前线程的锁保护区域并未完成,完成后自然会唤醒其他等待线程// 否则将会存在当前线程任务还未执行完成,却被其他线程抢了先去,那接下来的任务当如何??if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
总结一下,notify 的功能原理如下:
    1. 前提:自身已获取到外部锁;
    2. 转移下一个等待队列的节点到同步队列中;
    3. 如果遇到下一节点被取消情况,顺延到再下一节点直到为空,至多转移一个节点;
    4. 正常情况下不做线程的唤醒操作;
所以,实现 wait/notify, 最关键的就是维护两个队列,等待队列与同步队列,而且都要求是在有外部锁保证的情况下执行。
到此,我们也能回答一个问题:为什么wait/notify一定要在锁模式下才能运行?
因为wait是等待条件成立,此时必定存在竞争需要做保护,而它自身又必须释放锁以使外部条件可成立,且后续需要做恢复动作;而notify之后可能还有后续工作必须保障安全,notify只是锁的一个子集。。。
四、通知所有线程的实现:notifyAll
有时条件成立后,可以允许所有线程通行,这时就可以进行 notifyAll, 那么如果达到通知所有的目的呢?是一起通知还是??
以下是 AQS 中的实现:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAllpublic final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}
可以看到,它是通过遍历所有节点,依次转移等待队列到同步队列(通知)的,原本就没有人能同时干几件事的!
本文从java实现的角度去解析同步锁的原理与实现,但并不局限于java。道理总是相通的,只是像操作系统这样的大佬,能干的活更纯粹:比如让cpu根本不用调度一个线程。

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/11922194.html
