redission 分布式锁
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | 穆穆兔兔
来源 | urlify.cn/RrAR3u
概述
分布式系统有一个著名的理论CAP,指在一个分布式系统中,最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。所以在设计系统时,往往需要权衡,在CAP中作选择。当然,这个理论也并不一定完美,不同系统对CAP的要求级别不一样,选择需要考虑方方面面。
在微服务系统中,一个请求存在多级跨服务调用,往往需要牺牲强一致性老保证系统高可用,比如通过分布式事务,异步消息等手段完成。但还是有的场景,需要阻塞所有节点的所有线程,对共享资源的访问。比如并发时“超卖”和“余额减为负数”等情况。
本地锁可以通过语言本身支持,要实现分布式锁,就必须依赖中间件,数据库、redis、zookeeper等。
分布式锁特性
不管使用什么中间件,有几点是实现分布式锁必须要考虑到的。
互斥:互斥好像是必须的,否则怎么叫锁。
死锁: 如果一个线程获得锁,然后挂了,并没有释放锁,致使其他节点(线程)永远无法获取锁,这就是死锁。分布式锁必须做到避免死锁。
性能: 高并发分布式系统中,线程互斥等待会成为性能瓶颈,需要好的中间件和实现来保证性能。
锁特性:考虑到复杂的场景,分布式锁不能只是加锁,然后一直等待。最好实现如Java Lock的一些功能如:锁判断,超时设置,可重入性等。
Redis实现之Redisson原理
redission实现了JDK中的Lock接口,所以使用方式一样,只是Redssion的锁是分布式的。如下:
RLock lock = redisson.getLock("className");
lock.lock();
try {
// do sth.
} finally {
lock.unlock();
}
好,Lock主要实现是RedissionLock。
先来看常用的Lock方法实现。
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
再看lockInterruptibly
方法:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) { // 获取成功
return;
}
// 异步订阅redis chennel
RFuture future = subscribe(threadId);
commandExecutor.syncSubscription(future); // 阻塞获取订阅结果
try {
while (true) {// 循环判断知道获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);// 取消订阅
}
}
总结lockInterruptibly
:获取锁,不成功则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁。
下面重点看看如何获取锁:Long ttl = tryAcquire(leaseTime, unit, threadId)
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));// 通过异步获取锁,但get(future)实现同步
}
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) { //1 如果设置了超时时间,直接调用 tryLockInnerAsync
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//2 如果leaseTime==-1,则默认超时时间为30s
RFuture ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
//3 监听Future,获取Future返回值ttlRemaining(剩余超时时间),获取锁成功,但是ttlRemaining,则刷新过期时间
ttlRemainingFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
已经在注释中解释了,需要注意的是,此处用到了Netty的Future-listen模型,可以看看我的另一篇对Future的简单讲解:给Future一个Promise。
下面就是最重要的redis获取锁的方法tryLockInnerAsync
:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.