Redis 分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!
共 5844字,需浏览 12分钟
·
2022-02-27 00:22
来源:juejin.cn/post/6854573212831842311
事故现场
public SeckillActivityRequestVO seckillHandle(SeckillActivityRequestVO request) {
SeckillActivityRequestVO response;
String key = "key:" + request.getSeckillId;
try {
Boolean lockFlag = redisTemplate.opsForValue().setIfAbsent(key, "val", 10, TimeUnit.SECONDS);
if (lockFlag) {
// HTTP请求用户服务进行用户相关的校验
// 用户活动校验
// 库存校验
Object stock = redisTemplate.opsForHash().get(key+":info", "stock");
assert stock != null;
if (Integer.parseInt(stock.toString()) <= 0) {
// 业务异常
} else {
redisTemplate.opsForHash().increment(key+":info", "stock", -1);
// 生成订单
// 发布订单创建成功事件
// 构建响应VO
}
}
} finally {
// 释放锁
stringRedisTemplate.delete("key");
// 构建响应VO
}
return response;
}
事故原因
事故分析
仔细分析下来,可以发现,这个抢购接口在高并发场景下,是有严重的安全隐患的,主要集中在三个地方:
没有其他系统风险容错处理
由于用户服务吃紧,网关响应延迟,但没有任何应对方式,这是超卖的导火索。
看似安全的分布式锁其实一点都不安全
虽然采用了set key value [EX seconds] [PX milliseconds] [NX|XX]的方式,但是如果线程A执行的时间较长没有来得及释放,锁就过期了,此时线程B是可以获取到锁的。当线程A执行完成之后,释放锁,实际上就把线程B的锁释放掉了。这个时候,线程C又是可以获取到锁的,而此时如果线程B执行完释放锁实际上就是释放的线程C设置的锁。这是超卖的直接原因。
非原子性的库存校验
解决方案
知道了原因之后,我们就可以对症下药了。
实现相对安全的分布式锁
public void safedUnLock(String key, String val) {
String luaScript = "local in = ARGV[1] local curr=redis.call('get', KEYS[1]) if in==curr then redis.call('del', KEYS[1]) end return 'OK'"";
RedisScript<String> redisScript = RedisScript.of(luaScript);
redisTemplate.execute(redisScript, Collections.singletonList(key), Collections.singleton(val));
}
我们通过LUA脚本来实现安全地解锁。
实现安全的库存校验
// redis会返回操作之后的结果,这个过程是原子性的
Long currStock = redisTemplate.opsForHash().increment("key", "stock", -1);
改进之后的代码
public SeckillActivityRequestVO seckillHandle(SeckillActivityRequestVO request) {
SeckillActivityRequestVO response;
String key = "key:" + request.getSeckillId();
String val = UUID.randomUUID().toString();
try {
Boolean lockFlag = distributedLocker.lock(key, val, 10, TimeUnit.SECONDS);
if (!lockFlag) {
// 业务异常
}
// 用户活动校验
// 库存校验,基于redis本身的原子性来保证
Long currStock = stringRedisTemplate.opsForHash().increment(key + ":info", "stock", -1);
if (currStock < 0) { // 说明库存已经扣减完了。
// 业务异常。
log.error("[抢购下单] 无库存");
} else {
// 生成订单
// 发布订单创建成功事件
// 构建响应
}
} finally {
distributedLocker.safedUnLock(key, val);
// 构建响应
}
return response;
}
分布式锁有必要么
改进之后,其实可以发现,我们借助于redis本身的原子性扣减库存,也是可以保证不会超卖的。对的。但是如果没有这一层锁的话,那么所有请求进来都会走一遍业务逻辑,由于依赖了其他系统,此时就会造成对其他系统的压力增大。这会增加的性能损耗和服务不稳定性,得不偿失。基于分布式锁可以在一定程度上拦截一些流量。
分布式锁的选型
有人提出用RedLock来实现分布式锁。RedLock的可靠性更高,但其代价是牺牲一定的性能。在本场景,这点可靠性的提升远不如性能的提升带来的性价比高。如果对于可靠性极高要求的场景,则可以采用RedLock来实现。
再次思考分布式锁有必要么
// 通过消息提前初始化好,借助ConcurrentHashMap实现高效线程安全
private static ConcurrentHashMap<Long, Boolean> SECKILL_FLAG_MAP = new ConcurrentHashMap<>();
// 通过消息提前设置好。由于AtomicInteger本身具备原子性,因此这里可以直接使用HashMap
private static Map<Long, AtomicInteger> SECKILL_STOCK_MAP = new HashMap<>();
...
public SeckillActivityRequestVO seckillHandle(SeckillActivityRequestVO request) {
SeckillActivityRequestVO response;
Long seckillId = request.getSeckillId();
if(!SECKILL_FLAG_MAP.get(requestseckillId)) {
// 业务异常
}
// 用户活动校验
// 库存校验
if(SECKILL_STOCK_MAP.get(seckillId).decrementAndGet() < 0) {
SECKILL_FLAG_MAP.put(seckillId, false);
// 业务异常
}
// 生成订单
// 发布订单创建成功事件
// 构建响应
return response;
}