基于Java并发包分析CAS与AQS
共 9992字,需浏览 20分钟
·
2021-03-06 04:39
点击上方「Java有货」关注我们
+
A
CAS介绍与分析
⭐CAS 是什么?
CAS是英文单词CompareAndSwap的缩写,中文意思是:比较并替换。CAS需要有3个操作数:内存地址V,旧的预期值A,即将要更新的目标值B。
CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。
⭐分析
在Java中有 AtomicInteger 为代表的都是 通过cas 进行控制,这里以 incrementAndGet()为例,调用的代码如下:
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
int expected,
int x) {
return compareAndSetInt(o, offset, expected, x);
}
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
int expected,
int x);
compareAndSetInt需要三个参数,分别是内存位置 offset,旧的预期值 expected和新的值 x。操作时,先从内存位置读取到值,然后和预期值expected比较。如果相等,则将此内存位置的值改为新值x,返回 true。如果不相等,说明和其他线程冲突了,则不做任何改变,返回 false。
这种机制在不阻塞其他线程的情况下避免了并发冲突,比独占锁的性能高很多。CAS 在 Java 的原子类和并发包中有大量使用。
⭐ 底层实现
CAS 主要分三步,读取-比较-修改。其中比较是在检测是否有冲突,如果检测到没有冲突后,其他线程还能修改这个值,那么 CAS 还是无法保证正确性。所以最关键的是要保证比较-修改这两步操作的原子性。
CAS 底层是靠调用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架构中的 compare and exchange 指令。在多核的情况下,这个指令也不能保证原子性,需要在前面加上 lock 指令。lock 指令可以保证一个 CPU 核心在操作期间独占一片内存区域。那么 这又是如何实现的呢?
在处理器中,一般有两种方式来实现上述效果:总线锁和缓存锁。在多核处理器的结构中,CPU 核心并不能直接访问内存,而是统一通过一条总线访问。总线锁就是锁住这条总线,使其他核心无法访问内存。这种方式代价太大了,会导致其他核心停止工作。而缓存锁并不锁定总线,只是锁定某部分内存区域。当一个 CPU 核心将内存区域的数据读取到自己的缓存区后,它会锁定缓存对应的内存区域。锁住期间,其他核心无法操作这块内存区域。
CAS 就是通过这种方式实现比较和交换操作的原子性的。值得注意的是, CAS 只是保证了操作的原子性,并不保证变量的可见性,因此变量需要加上 volatile 关键字。
⭐ ABA 问题
上面提到,CAS 保证了比较和交换的原子性。但是从读取到开始比较这段期间,其他核心仍然是可以修改这个值的。如果核心将 A 修改为 B,CAS 可以判断出来。但是如果核心将 A 修改为 B 再修改回 A。那么 CAS 会认为这个值并没有被改变,从而继续操作。这是和实际情况不符的。解决方案是加一个版本号。
B
AQS介绍与分析
⭐AQS 介绍
AQS 全称 AbstractQueuedSynchronizer。AQS 中有两个重要的成员:
成员变量 state。用于表示锁现在的状态,用 volatile 修饰,保证内存一致性。同时所用对 state 的操作都是使用 CAS 进行的。state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。多次持有释放则多次加减。
还有一个双向链表,链表除了头节点外,每一个节点都记录了线程的信息,代表一个等待线程。这是一个 FIFO 的链表。
下面以 ReentrantLock 非公平锁的代码看看 AQS 的原理。
⭐AQS分析
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 默认是非公平锁, 如果想实现公平锁,在创建时传入true 即可,从源码看,无论是 FairSync 还是NonfairSync 都继承了Sync ,而 Sync 又继承了 AbstractQueuedSynchronizer 即我们所说的AQS
请求锁时有三种可能:
如果没有线程持有锁,则请求成功,当前线程直接获取到锁。
如果当前线程已经持有锁,则使用 CAS 将 state 值加1,表示自己再次申请了锁,释放锁时减1。这就是可重入性的实现。
如果由其他线程持有锁,那么将自己添加进等待队列。
⭐底层实现
public void lock() {
sync.lock();
}
final void lock() {
if (!initialTryLock())
acquire(1);
}
// initialTryLock() 方法
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 第一次尝试 cas
//如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。
//此操作具有volatile读写的内存语义
// compareAndSetState(int expect, int update)
// expect - 预期值 update - 新值
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 这里调用的是AbstractOwnableSynchronizer的方法
// 设置当前拥有独占访问权限的线程。
setExclusiveOwnerThread(current);
return true;
}
// 返回最后由 setExclusiveOwnerThread设置的线程,如果从未设置,则返回 null 。
else if (getExclusiveOwnerThread() == current) {
// getState() 返回同步状态的当前值。此操作具有volatile读取的内存语义。
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置同步状态的值。此操作具有volatile写入的内存语义。
setState(c);
return true;
} else
return false;
}
// acquire(1);
// 以独占模式获取,忽略中断。通过至少调用一次tryAcquire(int)实现 ,返回成功。
// 否则线程排队,可能反复阻塞和解除阻塞,调用tryAcquire(int)直到成功。
// 该方法可用于实现方法Lock.lock() 。
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
// tryAcquire 需要根据 所看的Sync来决定
// 这里的代码与 initialTryLock 的逻辑类似,不做过多的讲解
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// acquire(null, arg, false, false, false, 0L);
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
//可能从tail重复遍历,取消已取消的节点,直到没有找到为止。
//将可能被重新链接成为下一个合格收购者的节点打开。
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
// 尝试获取共享锁
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
// 将node 赋值给 head
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
// 在共享模式下唤醒给定的节点
signalNextIfShared(node);
if (interrupted)
// 调用本线程的interrupt 进行线程中断
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
加锁的逻辑基本上我们整体的看了一遍,接下来我们在看一下如何进行解锁。
// 解锁操作
public void unlock() {
sync.release(1);
}
// 解锁操作
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
// 这里与 initialTryLock() 方法相同
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 返回最后由setExclusiveOwnerThread设置的线程,如果从未设置,则返回null 。
// 此方法不会强制执行任何同步或volatile字段访问。
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
// 如果给定线程尚不可用,则为其提供许可。
LockSupport.unpark(s.waiter);
}
}
今日分享就到这里,如果感觉本文对您有所帮助,可以给小编点赞或转发让更多热爱技术的朋友一起学习!
添加微信进入程序员微信交流群:372787553