基于Java并发包分析CAS与AQS

Java有货

共 9992字,需浏览 20分钟

 ·

2021-03-06 04:39

点击上方「Java有货」关注我们


技术交流群添加方式


+



添加小编微信:372787553,备注:进群
带您进入Java技术交流群



A

CAS介绍与分析

⭐CAS 是什么?

CAS是英文单词CompareAndSwap的缩写,中文意思是:比较并替换。CAS需要有3个操作数:内存地址V,旧的预期值A,即将要更新的目标值B。


CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。


⭐分析

在Java中有 AtomicInteger 为代表的都是 通过cas 进行控制,这里以 incrementAndGet()为例,调用的代码如下:

    public final int incrementAndGet() {        return U.getAndAddInt(this, VALUE, 1) + 1;    }
@HotSpotIntrinsicCandidate public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }
@HotSpotIntrinsicCandidate public final boolean weakCompareAndSetInt(Object o, long offset, int expected, int x) { return compareAndSetInt(o, offset, expected, x); }
@HotSpotIntrinsicCandidate public final native boolean compareAndSetInt(Object o, long offset, int expected,                                                 int x);

compareAndSetInt需要三个参数,分别是内存位置 offset,旧的预期值 expected和新的值 x。操作时,先从内存位置读取到值,然后和预期值expected比较。如果相等,则将此内存位置的值改为新值x,返回 true。如果不相等,说明和其他线程冲突了,则不做任何改变,返回 false。


这种机制在不阻塞其他线程的情况下避免了并发冲突,比独占锁的性能高很多。CAS 在 Java 的原子类和并发包中有大量使用。


⭐ 底层实现

CAS 主要分三步,读取-比较-修改。其中比较是在检测是否有冲突,如果检测到没有冲突后,其他线程还能修改这个值,那么 CAS 还是无法保证正确性。所以最关键的是要保证比较-修改这两步操作的原子性。


CAS 底层是靠调用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架构中的 compare and exchange 指令。在多核的情况下,这个指令也不能保证原子性,需要在前面加上 lock 指令。lock 指令可以保证一个 CPU 核心在操作期间独占一片内存区域。那么 这又是如何实现的呢?


在处理器中,一般有两种方式来实现上述效果:总线锁和缓存锁。在多核处理器的结构中,CPU 核心并不能直接访问内存,而是统一通过一条总线访问。总线锁就是锁住这条总线,使其他核心无法访问内存。这种方式代价太大了,会导致其他核心停止工作。而缓存锁并不锁定总线,只是锁定某部分内存区域。当一个 CPU 核心将内存区域的数据读取到自己的缓存区后,它会锁定缓存对应的内存区域。锁住期间,其他核心无法操作这块内存区域。


CAS 就是通过这种方式实现比较和交换操作的原子性的。值得注意的是, CAS 只是保证了操作的原子性,并不保证变量的可见性,因此变量需要加上 volatile 关键字。


⭐ ABA 问题

上面提到,CAS 保证了比较和交换的原子性。但是从读取到开始比较这段期间,其他核心仍然是可以修改这个值的。如果核心将 A 修改为 B,CAS 可以判断出来。但是如果核心将 A 修改为 B 再修改回 A。那么 CAS 会认为这个值并没有被改变,从而继续操作。这是和实际情况不符的。解决方案是加一个版本号。


AQS介绍与分析

⭐AQS 介绍

AQS 全称 AbstractQueuedSynchronizer。AQS 中有两个重要的成员:


  • 成员变量 state。用于表示锁现在的状态,用 volatile 修饰,保证内存一致性。同时所用对 state 的操作都是使用 CAS 进行的。state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。多次持有释放则多次加减。

  • 还有一个双向链表,链表除了头节点外,每一个节点都记录了线程的信息,代表一个等待线程。这是一个 FIFO 的链表。

下面以 ReentrantLock 非公平锁的代码看看 AQS 的原理。

AQS分析

public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

ReentrantLock 默认是非公平锁, 如果想实现公平锁,在创建时传入true 即可,从源码看,无论是 FairSync 还是NonfairSync 都继承了Sync ,而 Sync 又继承了 AbstractQueuedSynchronizer 即我们所说的AQS


请求锁时有三种可能:


  • 如果没有线程持有锁,则请求成功,当前线程直接获取到锁。

  • 如果当前线程已经持有锁,则使用 CAS 将 state 值加1,表示自己再次申请了锁,释放锁时减1。这就是可重入性的实现。

  • 如果由其他线程持有锁,那么将自己添加进等待队列。

⭐底层实现

    public void lock() {        sync.lock();    }
@ReservedStackAccess final void lock() { if (!initialTryLock()) acquire(1); }
// initialTryLock() 方法 final boolean initialTryLock() { Thread current = Thread.currentThread(); // 第一次尝试 cas //如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。 //此操作具有volatile读写的内存语义 // compareAndSetState(int expect, int update) // expect - 预期值 update - 新值 if (compareAndSetState(0, 1)) { // first attempt is unguarded // 这里调用的是AbstractOwnableSynchronizer的方法 // 设置当前拥有独占访问权限的线程。 setExclusiveOwnerThread(current); return true; } // 返回最后由 setExclusiveOwnerThread设置的线程,如果从未设置,则返回 null 。 else if (getExclusiveOwnerThread() == current) { // getState() 返回同步状态的当前值。此操作具有volatile读取的内存语义。 int c = getState() + 1; if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置同步状态的值。此操作具有volatile写入的内存语义。 setState(c); return true; } else return false; }

// acquire(1); // 以独占模式获取,忽略中断。通过至少调用一次tryAcquire(int)实现 ,返回成功。 // 否则线程排队,可能反复阻塞和解除阻塞,调用tryAcquire(int)直到成功。 // 该方法可用于实现方法Lock.lock() 。 public final void acquire(int arg) { if (!tryAcquire(arg)) acquire(null, arg, false, false, false, 0L); }
// tryAcquire 需要根据 所看的Sync来决定 // 这里的代码与 initialTryLock 的逻辑类似,不做过多的讲解 protected final boolean tryAcquire(int acquires) { if (getState() == 0 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
// acquire(null, arg, false, false, false, 0L); final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued
/* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */
for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { //可能从tail重复遍历,取消已取消的节点,直到没有找到为止。 //将可能被重新链接成为下一个合格收购者的节点打开。 cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; // 尝试获取共享锁 try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { // 将node 赋值给 head node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) // 在共享模式下唤醒给定的节点 signalNextIfShared(node); if (interrupted) // 调用本线程的interrupt 进行线程中断 current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }

加锁的逻辑基本上我们整体的看了一遍,接下来我们在看一下如何进行解锁。

    // 解锁操作  public void unlock() {        sync.release(1);    }
// 解锁操作 public final boolean release(int arg) { if (tryRelease(arg)) { signalNext(head); return true; } return false; } // 这里与 initialTryLock() 方法相同 @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; // 返回最后由setExclusiveOwnerThread设置的线程,如果从未设置,则返回null 。 // 此方法不会强制执行任何同步或volatile字段访问。 if (getExclusiveOwnerThread() != Thread.currentThread()) throw new IllegalMonitorStateException(); boolean free = (c == 0); if (free) setExclusiveOwnerThread(null); setState(c); return free; }
private static void signalNext(Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0) { s.getAndUnsetStatus(WAITING); // 如果给定线程尚不可用,则为其提供许可。 LockSupport.unpark(s.waiter); } }

今日分享就到这里,如果感觉本文对您有所帮助,可以给小编点赞或转发让更多热爱技术的朋友一起学习!


添加微信进入程序员微信交流群:372787553

浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报