Java多线程之ReentrantReadWriteLock读写锁
这里就JUC包中的ReentrantReadWriteLock读写锁做相关介绍
概述
前面介绍了ReentrantLock可重入锁,但其存在明显的弊端。对于读场景而言,实际完全可以允许多个线程同时访问,而不必使用独占锁来进行并发保护。故ReentrantReadWriteLock读写锁应运而生。其内部维护了两个锁——读锁、写锁。前者为共享锁,后者则为互斥锁。具体而言,读锁可以被多个线程同时获取,而写锁只能被一个线程获取;同时读锁、写锁之间也是互斥的,即一旦某个线程获取到了读锁,则其他线程不可以同时获得写锁。反之同理。具体地,读写锁支持公平、非公平锁两种实现方式,默认为非公平锁。在锁的获取方面,其与ReentrantLock可重入锁类似。即支持lock()、lockInterruptibly()阻塞式获取,也支持tryLock()、tryLock(long timeout, TimeUnit unit)实现非阻塞式获取。但tryLock()方法会破坏公平性,即使是一个公平的读写锁实例。故为了保证公平性,可使用支持超时的tryLock方法,同时将超时时间设为0即可——tryLock(0, TimeUnit.SECONDS)。而在条件变量Condition方面,仅写锁支持
实践
读写测试
现在分别就读读、写写、读写场景进行实践验证。示例代码如下所示
/**
* ReentrantReadWriteLock Test 1: 读写测试
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ReentrantReadWriteLockTest1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 测试: 读锁为共享锁
*/
@Test
public void test1() {
System.out.println("\n---------------------- Test 1 ----------------------");
for (int i=0; i<3; i++) {
Runnable task = new ReadTask( lock, "读任务 #"+i );
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* 测试: 写锁为互斥锁
*/
@Test
public void test2() {
System.out.println("\n---------------------- Test 2 ----------------------");
for (int i=0; i<3; i++) {
Runnable task = new WriteTask( lock, "写任务 #"+i );
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* 测试: 读写互斥
*/
@Test
public void test3() {
System.out.println("\n---------------------- Test 3 ----------------------");
for (int i=0; i<8; i++) {
Runnable task = null;
Boolean isReadTask = RandomUtils.nextBoolean();
if( isReadTask ) {
task = new ReadTask( lock, "读任务 #"+i );
} else {
task = new WriteTask( lock, "写任务 #"+i );
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 50*1000 ); } catch (Exception e) {}
}
/**
* 打印信息
* @param msg
*/
public static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
@AllArgsConstructor
private static class ReadTask implements Runnable{
private ReentrantReadWriteLock lock;
private String name;
@Override
public void run() {
// 获取读锁
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
readLock.lock();
info(name+ ": 成功获取读锁");
try {
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(1000, 3000));
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
info(name+ ": 释放读锁");
readLock.unlock();
}
}
}
@AllArgsConstructor
private static class WriteTask implements Runnable{
private ReentrantReadWriteLock lock;
private String name;
@Override
public void run() {
// 获取写锁
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
info(name+ ": 成功获取写锁");
try {
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(1000, 3000));
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
info(name+ ": 释放写锁");
writeLock.unlock();
}
}
}
}
从Test1、Test2的测试结果可以证明读锁是共享锁、写锁是互斥锁
从Test3的测试结果可以看出读写之间是互斥的
可重入性
ReentrantReadWriteLock同样是可重入的。当一个线程获取到读锁(或写锁)后,可以继续获取相应类型的锁。示例代码如下所示
/**
* ReentrantReadWriteLock Test 2: 可重入性
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ReentrantReadWriteLockTest2 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 测试: 读锁具有可重入性
*/
@Test
public void test1() {
System.out.println("\n---------------------- Test 1 ----------------------\n");
for (int i=0; i<2; i++) {
Runnable runnable = new Task("Task"+i, lock.readLock(), lock.readLock());
threadPool.execute( runnable );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
}
/**
* 测试: 写锁具有可重入性
*/
@Test
public void test2() {
System.out.println("\n---------------------- Test 2 ----------------------\n");
for (int i=0; i<2; i++) {
Runnable runnable = new Task("Task"+i, lock.writeLock(), lock.writeLock());
threadPool.execute( runnable );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
}
/**
* 打印信息
* @param msg
*/
public static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
//String log = "["+time+"] "+ " <"+ thread +"> " + msg;
String log = "["+time+"] " + msg;
System.out.println(log);
}
@AllArgsConstructor
private static class Task implements Runnable{
private String name;
private Lock firstLock;
private Lock secondLock;
@Override
public void run() {
firstLock.lock();
info(name+ ": 成功获取锁 firstLock");
try {
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(1000, 3000));
methodA();
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
info(name+ ": 释放锁 firstLock");
firstLock.unlock();
}
}
private void methodA() {
secondLock.lock();
info(name+ ": 成功获取锁 secondLock");
try {
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(1000, 3000));
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
info(name+ ": 释放锁 secondLock");
secondLock.unlock();
}
}
}
}
测试结果,如下所示
锁升级、降级
所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然ReentrantReadWriteLock是不支持的。理由也很简单,读锁是可以多个线程同时持有的。若其中的一个线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他线程依然持有读锁
反之,ReentrantReadWriteLock是支持锁降级的,即写锁降级为读锁。当一个线程在获得写锁后,依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的场景也很常见,假设存在一个先写后读的方法。共计耗时5s。其中前1秒用于写操作、后4秒用于读操作。最简单的思路是对该方法从开始到结束全部使用写锁进行保护。但其实该方法后4秒的读操作完全没有必要使用写锁进行保护。因为这样会阻塞其他线程读锁的获取,效率较低。而如果通过写锁、读锁分别对前1秒、后4秒的操作进行控制,即先获取写锁、再释放写锁,然后获取读锁、再释放读锁的方案。则有可能导致并发问题,具体表现在执行该方法过程中,刚释放写锁、准备获取读锁时,其他线程恰好获取到了写锁并对数据进行了更新。而锁降级则为此场景提供了新的解决思路及方案。其一方面保证了安全,读锁在写锁释放前获取,另一方面保证了高效,因为读锁是共享的
/**
* ReentrantReadWriteLock Test 2: 锁升级、锁降级
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ReentrantReadWriteLockTest2 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 测试: 锁升级
*/
@Test
public void test3() {
System.out.println("\n---------------------- Test 3 ----------------------\n");
Runnable runnable1 = new Task("Task", lock.readLock(), lock.writeLock());
threadPool.execute( runnable1 );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
}
/**
* 测试: 锁降级
*/
@Test
public void test4() {
System.out.println("\n---------------------- test4 ----------------------\n");
Runnable runnable = () -> {
try {
// 1. 获取写锁
lock.writeLock().lock();
info("成功获取写锁");
// 2. 获取读锁
lock.readLock().lock();
info("成功获取读锁");
} catch (Exception e) {
System.out.println( "Happen Exception: " + e.getMessage());
} finally {
// 3. 释放写锁
lock.writeLock().unlock();
info("释放写锁");
}
};
threadPool.execute( runnable );
// 延时等待, 保证写锁已经释放, 而仅仅持有读锁
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
for (int i=0; i<2; i++) {
Runnable runnable2 = new Task("Task"+i, lock.readLock(), lock.readLock());
threadPool.execute( runnable2 );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
}
}
从Test 3的测试结果可以看出。由于不支持锁升级,故其在持有读锁的条件下尝试获取写锁会被一直阻塞下去
从Test 4的测试结果可以看出锁降级是可行的。这里为了便于演示,故runnable一直未释放其持有的读锁。实际应用中需要将其释放掉
实现原理
基本结构
ReentrantReadWriteLock读写锁的实现过程同样依赖于AQS,其是对AQS中共享锁、互斥锁的应用。在构建ReentrantReadWriteLock读写锁实例过程中,一方面,其会创建AQS实现类Sync的实例,其中Sync根据公平性与否又可细分为NonfairSync、FairSync这两个子类。这两个子类通过实现Sync中的readerShouldBlock、writerShouldBlock抽象方法来保障公平与否这一特性;另一方面,还会相应地创建ReadLock、WriteLock实例,并通过持有Sync实例来进行对AQS的调用。而且在ReentrantReadWriteLock读写锁的实现中,其将AQS的state字段分为两部分来使用。具体地,state字段的高16位表示获取到读锁的次数;state字段的低16位表示获取到写锁的次数
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 读锁变量
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁变量
private final ReentrantReadWriteLock.WriteLock writerLock;
// AQS实现类变量
final Sync sync;
// 获取读锁
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}
// 获取写锁
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}
/********************* 构造器 *********************/
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/************************************************/
// 内部类: AQS实现类
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// AQS的state字段的高16位, 表示获取到读锁的次数
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
// AQS的state字段的低16位, 表示获取到写锁的次数
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
}
// 内部类:非公平的Sync子类
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
...
}
final boolean readerShouldBlock() {
...
}
}
// 内部类:公平的Sync子类
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
...
}
final boolean readerShouldBlock() {
...
}
}
// 内部类: 读锁
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
// 内部类: 写锁
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
}
写锁
ReentrantReadWriteLock中的写锁是对AQS中互斥锁的使用。其使用方式是通过writeLock()获取写锁实例,然后分别通过写锁的lock()、unlock()方法进行加锁、解锁操作
在调用加锁lock()方法时,其首先会调用AQS的acquire()方法。而在Sync类中则提供了tryAcquire方法的实现。如果其返回true则加锁操作结束,否则其将会进入AQS的阻塞队列。同时为了支持公平、非公平两种实现版本,Sync类中定义了writerShouldBlock抽象方法,用于判断当前线程是否可以直接通过CAS获取锁。然后通过Sync的子类NonfairSync、FairSync来实现该方法。具体地,在NonfairSync类中,writerShouldBlock方法会直接返回false。即直接利用CAS获取锁;而在FairSync类中,writerShouldBlock方法需要调用AQS的hasQueuedPredecessors方法,即如果AQS阻塞队列中如果没有其他线程在排队才可以通过CAS获取锁
类似地,在调用解锁unlock()方法时,其首先会调用AQS的release()方法。而在Sync类中则提供了tryRelease方法的实现。如果返回true则说明锁已经完全被释放了,需要AQS唤醒队列中的其他线程
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 写锁变量
private final ReentrantReadWriteLock.WriteLock writerLock;
// AQS实现类变量
final Sync sync;
// 获取写锁
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean writerShouldBlock();
// 尝试获取AQS的独占锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// AQS的state字段值
int c = getState();
// 获取到写锁的次数
int w = exclusiveCount(c);
// c不为0, 说明读锁或写锁已经被某线程获取了
if (c != 0) {
// 要么 w为0, 说明已经有线程获取了读锁, 故返回false
// 要么 w不为0 即已经有线程获取了写锁, 但持锁线程并不是当前线程, 同样返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// w不为0 且 持锁线程数是当前线程, 即此处重入写锁
setState(c + acquires);
return true;
}
// c=0, 说明锁未被任何线程持有
// 根据是否需要保障公平性, 来确定下一步是否直接通过CAS获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
// 需要排队 或 加锁失败, 返回false
return false;
// 加锁成功
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 锁被完全释放了, 故需要返回true
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
}
// 内部类:非公平的Sync子类
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
// 总是返回false, 即允许不排队, 直接获取锁
return false;
}
}
// 内部类:公平的Sync子类
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
// 判断AQS队列中是否有其他线程在排队
return hasQueuedPredecessors();
}
}
// 内部类: 写锁
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
}
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 获取AQS的独占锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 需要子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 释放AQS的独占锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 需要子类去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}
读锁
ReentrantReadWriteLock中的读锁是对AQS中共享锁的使用。其使用方式是通过readLock()获取读锁实例,然后分别通过读锁的lock()、unlock()方法进行加锁、解锁操作
在调用加锁lock()方法时,其首先会调用AQS的acquireShared()方法。而在Sync类中则提供了tryAcquireShared方法的实现。如果返回值小于0则进入AQS的阻塞队列,否则加锁操作结束。同时为了支持公平、非公平两种实现版本,Sync类中定义了readerShouldBlock抽象方法,用于判断当前线程是否可以直接通过CAS获取锁。然后通过Sync的子类NonfairSync、FairSync来实现该方法。具体地,在NonfairSync类中,readerShouldBlock方法会调用AQS的apparentlyFirstQueuedIsExclusive方法来判断AQS阻塞队列中排队的第一个节点是不是获取写锁的,如果是则放弃本次CAS操作;而在FairSync类中,readerShouldBlock方法同样需要调用AQS的hasQueuedPredecessors方法,即如果AQS阻塞队列中如果没有其他线程在排队,本次才尝试通过CAS获取锁
类似地,在调用解锁unlock()方法时,其首先会调用AQS的releaseShared()方法。而在Sync类中则提供了tryReleaseShared方法的实现。如果返回true则说明锁已经完全被释放了,需要AQS进一步唤醒队列中的其他线程
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 写锁变量
private final ReentrantReadWriteLock.ReadLock readerLock;
// AQS实现类变量
final Sync sync;
// 获取读锁
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean readerShouldBlock();
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 写锁被其他线程持有
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
}
// 内部类:非公平的Sync子类
static final class NonfairSync extends Sync {
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 内部类:公平的Sync子类
static final class FairSync extends Sync {
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 内部类: 读锁
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
}
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 获取AQS的共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 需要子类去实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 释放AQS的共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 需要子类去实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}
参考文献
Java并发编程之美 翟陆续、薛宾田著