分布式锁(三):基于Redisson的分布式锁实践
共 16414字,需浏览 33分钟
·
2022-05-10 15:11
Redisson是基于Redis的Java驻内存数据网格(In-Memory Data Grid),底层使用Netty进行实现。其提供了相应的分布式锁实现
RedissonLock 分布式非公平可重入互斥锁
RedissonLock是一个分布式非公平可重入互斥锁,其在获取锁的过程中,支持lock阻塞式、tryLock非阻塞式两种形式。其中,这两个方法还有多个重载版本,以支持设置锁的最大持有时间、设置获取锁的最大等待时间。具体方法如下所示
# 阻塞式获取锁
void lock();
# 阻塞式获取锁
# 支持通过leaseTime参数设置锁的最大持有时间
void lock(long leaseTime, TimeUnit unit);
# 非阻塞式获取锁
boolean tryLock();
# 非阻塞式获取锁
# 支持通过time参数设置获取锁的最大等待时间
boolean tryLock(long time, TimeUnit unit);
# 非阻塞式获取锁
# 支持通过waitTime参数设置获取锁的最大等待时间
# 支持通过leaseTime参数设置锁的最大持有时间
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
可重入性
现在我们验证下RedissonLock是一个互斥锁并且支持可重入。示例代码如下所示
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}
/**
* 测试: 阻塞式获取锁、可重入性
*/
@Test
public void testLock1() {
final String lockName = "sellPc";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();
info("释放锁 #1\n");
lock.unlock();
}
};
RLock tempLock = redissonClient.getLock(lockName);
if( tempLock instanceof RedissonLock) {
System.out.println("锁类型: RedissonLock");
}
try{ Thread.sleep( 1*1000 ); } catch (Exception e) {}
for (int i=1; i<=3; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}
@AfterClass
public static void close() {
redissonClient.shutdown();
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,符合预期
显式指定锁的最大持有时间
前面提到支持通过leaseTime参数显式设置锁的最大持有时间,当业务持锁时间超过leaseTime参数值,则其持有的锁会被自动释放。但需要注意的是某个线程的锁一旦被自动释放后,此时再调用unlock方法来释放锁时,即会抛出IllegalMonitorStateException异常。原因也很简单,因为此时线程实际上并未持有锁,示例代码如下所示
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...
/**
* 测试: 指定锁的最大持有时间
*/
@Test
public void testLock2() {
final String lockName = "sellBooK";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
// 阻塞式获取锁, 指定锁的最大持有时间为1秒
lock.lock(1, TimeUnit.SECONDS);
info("成功获取锁");
// 模拟业务耗时: 10s
Thread.sleep( 10 * 1000 );
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
} finally {
info("释放锁");
try {
lock.unlock();
} catch (IllegalMonitorStateException e) {
info("Happen Exception: " + e.getMessage());
}
}
};
for (int i=1; i<=5; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}
...
}
测试结果如下所示,符合预期。可以看到每隔1秒后,由于线程持锁时间到期了。锁被自动释放了,进而使得下一个任务拿到了锁。并且由于每个任务的锁都是自动释放的,故每次调用unlock方法均会抛出异常
非阻塞式获取锁
下面展示如果通过tryLock方法进行非阻塞式获取锁,示例代码如下所示
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...
/**
* 测试: 非阻塞式获取锁
*/
@Test
public void testTryLock() {
final String lockName = "sellPhone";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
boolean flag = false;
try{
// 非阻塞式获取锁
flag = lock.tryLock();
if( flag ) {
info("成功获取锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} else {
info("未获取到锁\n");
}
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
if( flag ) {
info("释放锁\n");
lock.unlock();
}
}
};
for (int i=1; i<=5; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}
...
}
测试结果如下所示,符合预期
Watch Dog看门狗机制
在介绍Redisson的Watch Dog看门狗机制之前,我们先来做个测试。如果某个线程一直持有锁执行业务逻辑,则锁是否会被自动释放呢?示例代码如下所示
/**
* RedissonLock Demo : 分布式非公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonLockDemo {
...
/**
* 测试: 看门狗机制
*/
@Test
public void testLock3() {
final String lockName = "sellPig";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getLock(lockName);
try{
info("尝试获取锁");
// 阻塞式获取锁
lock.lock();
info("成功获取锁");
// 未显式指定锁的持有时间, 则看门狗会在断开连接前一直进行续期
while (true) {
}
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
} finally {
info("成功释放锁");
lock.unlock();
}
};
for (int i=1; i<5; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
info("---------------------- 系统下线 ----------------------");
}
...
}
测试结果如下所示,该锁被持有后,一直未被释放。其他任务都被阻塞住了
当我们调用不含leaseTime参数版本的lock()方法时,即未显式设置最大持锁时间。则其在RedissonLock类内部会将 特殊值-1 传给leaseTime参数。然后在tryAcquireAsync方法中会通过RedissonLock类的internalLockLeaseTime字段设置一个默认的最大持锁时间。最后通过RedissonLock构造器我们不难发现 internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段。其中lockWatchdogTimeout字段的默认值为30秒。换言之即使我们调用lock方法时,未显式设置最大持锁时间。但RedissonLock内部也会通过lockWatchdogTimeout字段给该锁设置一个最大持有时间,默认值为30秒
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
// internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
/**
* 未显式设置最大持锁时间的lock方法
*/
@Override
public void lock() {
try {
// 则其会将 特殊值-1 传给leaseTime参数
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
...
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
...
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 未显式设置最大持锁时间 则会 通过 internalLockLeaseTime 字段设置一个默认的最大持锁时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
}
...
public class Config {
// 时间: 30秒
private long lockWatchdogTimeout = 30 * 1000;
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}
...
}
那问题来了,为啥在我们刚刚的测试代码中即使持锁时间超过了30秒,锁也没有被自动释放呢?原因就在于Redisson的看门狗机制。在RedissonLock类的tryAcquireAsync方法中,未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期。即RedissonLock类的tryAcquireAsync方法中会调用scheduleExpirationRenewal()以启动一个定时任务用于进行自动续期。具体的续期逻辑在RedissonBaseLock类的renewExpiration方法中,其中自动续期定时任务的执行周期 是RedissonBaseLock类的internalLockLeaseTime字段值的1/3。然后通过renewExpirationAsync方法每次利用Lua脚本向Redis发送续期命令,具体地。每次续期时会将RedissonBaseLock类的internalLockLeaseTime字段值设置为新的最大持锁时间。同样地,RedissonBaseLock类的 internalLockLeaseTime 字段值也是来自于Config类的lockWatchdogTimeout字段,即默认为30秒
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
// internalLockLeaseTime 参数的值来自于Config类的lockWatchdogTimeout变量
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 自动续期
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 通过Lua脚本向Redis发送续期命令
RFuture future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// 定时任务的执行周期 是 internalLockLeaseTime字段值的 1/3
// 即, 定时任务的执行周期 是 lockWatchdogTimeout字段值的 1/3
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
// 每次续期时会将internalLockLeaseTime字段值作为新的最大持锁时间
Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));
}
...
}
至此我们应该比较清楚Redisson的看门狗机制了:
Redisson的Watch Dog看门狗机制只会在未显式设置最大持锁时间才会生效。换言之,一旦调用lock方法时指定了leaseTime参数值,则该锁到期后即会自动释放。Redisson的Watch Dog看门狗不会对该锁进行自动续期 当我们未显式设置Config类的lockWatchdogTimeout字段值时,使用默认的30秒。此时如果加锁时未显式设置最大持锁时间,即Watch Dog看门狗机制会生效的场景中。该锁实际上一开始也会设置一个默认的最大持锁时间,即30秒。然后看门狗每隔10秒(30秒 * 1/3 = 10秒)会将该锁的最大持锁时间再次设置为30秒,以达到自动续期的目的。这样只要持锁线程的业务还未执行完,则该锁就一直有效、不会被自动释放。当然一旦持锁的服务实例发生宕机后,看门狗的定时任务自然也无法续期。这样锁到期后也就释放掉了,避免了死锁的发生
RedissonFairLock 分布式公平可重入互斥锁
由于RedissonLock是非公平的,故Redisson提供了一个分布式公平可重入互斥锁——RedissonFairLock。示例代码如下所示
/**
* RedissonFairLock Demo : 分布式公平可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonFairLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
/**
* 测试: 可重入性
*/
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellWatch";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getFairLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();
info("释放锁 #1\n");
lock.unlock();
}
};
RLock tempLock = redissonClient.getFairLock(lockName);
if( tempLock instanceof RedissonFairLock ) {
System.out.println("锁类型: RedissonFairLock");
}
try{ Thread.sleep( 4*1000 ); } catch (Exception e) {}
for (int i=1; i<=3; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,符合预期
RReadWriteLock 分布式读写锁
读写测试
RReadWriteLock是一个分布式可重入读写锁。其中读锁为可重入的共享锁、写锁为可重入的互斥锁,且读写互斥。示例代码如下所示
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}
/**
* 测试: 读锁为共享锁, 读锁具有可重入性
*/
@Test
public void test1Read() {
System.out.println("\n---------------------- Test 1 : Read ----------------------");
String lockName = "sellCat";
for(int i=1; i<=3; i++) {
String taskName = "读任务#"+i;
Runnable task = new ReadTask(taskName, redissonClient, lockName);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* 测试: 写锁为互斥锁, 写锁具有可重入性
*/
@Test
public void test2Write() {
System.out.println("\n---------------------- Test 2 : Write ----------------------");
String lockName = "sellDog";
for(int i=1; i<=3; i++) {
String taskName = "写任务#"+i;
Runnable task = new WriteTask(taskName, redissonClient, lockName);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* 测试: 读写互斥
*/
@Test
public void test3ReadWrite() {
System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
String lockName = "sellLion";
for(int i=1; i<=5; i++) {
Runnable task = null;
Boolean isReadTask = RandomUtils.nextBoolean();
if( isReadTask ) {
task = new ReadTask( "读任务#"+i, redissonClient, lockName);
} else {
task = new WriteTask( "写任务#"+i, redissonClient, lockName);
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
@AfterClass
public static void close() {
redissonClient.shutdown();
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
/**
* 读任务
*/
private static class ReadTask implements Runnable {
private String taskName;
private RedissonReadLock readLock;
public ReadTask(String taskName, RedissonClient redissonClient, String lockName) {
this.taskName = taskName;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
this.readLock = (RedissonReadLock) readWriteLock.readLock();
}
@Override
public void run() {
try{
readLock.lock();
info(taskName + ": 成功获取读锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.lock();
info(taskName + ": 成功获取读锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放读锁 #2");
readLock.unlock();
info(taskName + ": 释放读锁 #1");
readLock.unlock();
}
}
}
/**
* 写任务
*/
private static class WriteTask implements Runnable {
private String taskName;
private RedissonWriteLock writeLock;
public WriteTask(String taskName, RedissonClient redissonClient, String lockName) {
this.taskName = taskName;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
this.writeLock = (RedissonWriteLock) readWriteLock.writeLock();
}
@Override
public void run() {
try{
writeLock.lock();
info(taskName + ": 成功获取写锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.lock();
info(taskName + ": 成功获取写锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放写锁 #2");
writeLock.unlock();
info(taskName + ": 释放写锁 #1\n");
writeLock.unlock();
}
}
}
}
读锁测试结果如下所示,符合预期
写锁测试结果如下所示,符合预期
读写测试结果如下所示,符合预期
锁升级、锁降级
所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
锁升级示例代码如下所示
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {
...
/**
* 测试: 锁升级
*/
@Test
public void test4Read2Write() {
System.out.println("---------------------- Test 4 : Read -> Write ----------------------\n");
String lockName = "sellTiger";
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();
try {
readLock.lock();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.lock();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.unlock();
info("成功释放读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.unlock();
info("成功释放写锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
}
测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞
锁降级示例代码如下所示
/**
* RReadWriteLock Demo : 分布式可重入读写锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RReadWriteLockDemo {
...
/**
* 测试: 锁降级
*/
@Test
public void test5Write2Read() {
System.out.println("---------------------- Test 2 : Write -> Read ----------------------");
String lockName = "sellChicken";
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
RedissonWriteLock writeLock = (RedissonWriteLock) readWriteLock.writeLock();
try {
writeLock.lock();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.lock();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.unlock();
info("成功释放写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.unlock();
info("成功释放读锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
}
测试结果如下所示,符合预期
RedissonSpinLock 分布式非公平可重入自旋互斥锁
RedissonSpinLock则是一个分布式非公平可重入自旋互斥锁。示例代码如下所示
/**
* RedissonSpinLock Demo : 分布式非公平可重入自旋互斥锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonSpinLockDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
/**
* 测试: 可重入性、互斥锁
*/
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellKeyword";
Runnable task = () -> {
// 设置分布式锁
RLock lock = redissonClient.getSpinLock(lockName);
try{
// 阻塞式获取锁
lock.lock();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
lock.lock();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #2");
lock.unlock();
info("释放锁 #1\n");
lock.unlock();
}
};
RLock tempLock = redissonClient.getSpinLock(lockName);
if( tempLock instanceof RedissonSpinLock) {
System.out.println("锁类型: RedissonSpinLock");
}
try{ Thread.sleep( 2*1000 ); } catch (Exception e) {}
for (int i=1; i<=3; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,符合预期
RedissonCountDownLatch 分布式闩锁
RedissonCountDownLatch是一个分布式的CountDownLatch闩锁。比如我们的业务系统会依赖很多其他基础服务,这样在业务系统启动过程中,需要等待其他基础服务全部启动完毕。示例代码如下所示
/**
* RedissonCountDownLatch Demo : 分布式闩锁
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonCountDownLatchDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static RedissonClient redissonClient;
@BeforeClass
public static void init() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
redissonClient = Redisson.create( config );
}
@Test
public void test1() throws InterruptedException {
final String countDownLatchName = "systemInit";
int count = 5;
for (int i=1; i<=count; i++) {
String serviceName = "基础服务 #"+i;
BasicService basicService = new BasicService(serviceName, redissonClient, countDownLatchName, count);
threadPool.execute( basicService );
}
RedissonCountDownLatch countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
countDownLatch.trySetCount(count);
// 阻塞等待基础服务全部启动完成
countDownLatch.await();
info("系统初始化已完成, 业务系统启动 ...");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
/**
* 基础服务
*/
private static class BasicService implements Runnable {
private String serviceName;
private RedissonCountDownLatch countDownLatch;
public BasicService(String serviceName, RedissonClient redissonClient, String countDownLatchName, Integer count) {
this.serviceName = serviceName;
this.countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
this.countDownLatch.trySetCount( count );
}
@Override
public void run() {
try{
info(serviceName + ": 启动中");
// 模拟 基础服务启动 耗时
Thread.sleep( RandomUtils.nextLong(1, 5) * 1000 );
countDownLatch.countDown();
info(serviceName + ": 启动完成");
} catch (Exception e) {
System.out.println( serviceName + ": Happen Exception: " + e.getMessage());
}
}
}
}
测试结果如下所示,符合预期
RedissonSemaphore 分布式信号量
RedissonSemaphore是一个分布式的信号量,示例代码如下所示
/**
* RedissonSemaphore Demo : 分布式信号量
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonSemaphoreDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellAnimal";
// 系统最大并发处理量
int maxLimit = 3;
IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户#"+num, redissonClient, lockName, maxLimit) )
.forEach( threadPool::execute );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
private static class UserReq implements Runnable {
private String name;
private RedissonSemaphore semaphore;
public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
this.name = name;
this.semaphore = (RedissonSemaphore) redissonClient.getSemaphore(lockName);
// 设置信号量的许可数
semaphore.trySetPermits( maxLimit );
}
@Override
public void run() {
try {
// 模拟用户不定时发起请求
Thread.sleep(RandomUtils.nextLong(500, 2000));
info( name + ": 发起请求" );
// 阻塞等待,直到获取许可
semaphore.acquire();
info(name + ": 系统开始处理请求");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextInt(5, 20)*1000);
// 用户请求处理完毕,释放许可
semaphore.release();
info(name + ": 系统处理完毕");
}catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}
}
测试结果如下所示,符合预期
RedissonPermitExpirableSemaphore 分布式支持有效期的信号量
相比较于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在获取许可的acquire方法中,增加了一个支持leaseTime参数的重载版本。以实现指定许可的最大持有时间。一旦业务持许可时间超过leaseTime参数值,则其持有的许可会被自动释放。但需要注意的是某个线程的许可一旦被自动释放后,此时再调用release方法来释放许可时,即会抛出异常。原因也很简单,因为此时线程实际上并未持有许可,示例代码如下所示。示例代码如下所示
/**
* RedissonPermitExpirableSemaphore Demo : 分布式支持有效期的信号量
* @author Aaron Zhu
* @date 2022-04-04
*/
public class RedissonPermitExpirableSemaphoreDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Test
public void test1() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create( config );
final String lockName = "sellMilk";
// 系统最大并发处理量
int maxLimit = 3;
IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户 #"+num, redissonClient, lockName, maxLimit) )
.forEach( threadPool::execute );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
redissonClient.shutdown();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
private static class UserReq implements Runnable {
private String name;
private RedissonPermitExpirableSemaphore semaphore;
public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
this.name = name;
this.semaphore = (RedissonPermitExpirableSemaphore) redissonClient.getPermitExpirableSemaphore(lockName);
// 设置信号量的许可数
semaphore.trySetPermits( maxLimit );
}
@Override
public void run() {
try {
// 模拟用户不定时发起请求
if( !name.equals("用户 #1") ) {
Thread.sleep( RandomUtils.nextLong(1000, 2000) );
}
info( name + ": 发起请求" );
// 阻塞等待直到获取许可, 指定信号量的最大持有时间为2秒
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
info(name + ": 系统开始处理请求");
// 模拟业务耗时
if( name.equals("用户 #1") ) {
Thread.sleep( 5 * 1000 );
} else {
Thread.sleep(RandomUtils.nextInt(500, 1000));
}
// 用户请求处理完毕,释放许可
semaphore.release(permitId);
info(name + ": 系统处理完毕");
}catch (Exception e) {
info( name + ": Happen Exception: " + e.getCause().getMessage());
}
}
}
}
测试结果如下所示,符合预期