【329期】如何利用redis分布式锁,解决秒杀场景下的订单超卖问题
共 17934字,需浏览 36分钟
·
2021-09-14 21:21
1. 秒杀场景
Controller层:
@RestController
@RequestMapping("/skill")
@Slf4j
public class SecKillController {
@Autowired
private SecKillService secKillService;
//查询秒杀活动特价商品的信息
@GetMapping("/query/{productId}")
public String query(@PathVariable String productId)throws Exception {
return secKillService.querySecKillProductInfo(productId);
}
//秒杀
@GetMapping("/order/{productId}")
public String skill(@PathVariable String productId)throws Exception {
log.info("@skill request, productId:" + productId);
secKillService.orderProductMockDiffUser(productId);
return secKillService.querySecKillProductInfo(productId);
}
}
Service层:
@Service
public class SecKillServiceImpl implements SecKillService {
private static final int TIMEOUT = 10 * 1000; //超时时间 10s
@Autowired
private RedisLock redisLock;
// 雅诗兰黛特价小棕瓶,限量100000份
static Map<String,Integer> products;
static Map<String,Integer> stock;
static Map<String,String> orders;
static {
//模拟多个表,商品信息表,库存表,秒杀成功订单表
products = new HashMap<>();
stock = new HashMap<>();
orders = new HashMap<>();
//商品Id---商品库存
products.put("123456", 100000);
//商品id---商品库存
stock.put("123456", 100000);
}
private String queryMap(String productId) {
return "雅诗兰黛小棕瓶特价,限量份"
+ products.get(productId)
+" 还剩:" + stock.get(productId)+" 份"
+" 该商品成功下单用户数目:"
+ orders.size() +" 人" ;
}
@Override
public String querySecKillProductInfo(String productId) {
return this.queryMap(productId);
}
//秒杀的逻辑:可以在该方法生加上Synchronized解决超卖
@Override
public void orderProductMockDiffUser(String productId) {
//1.查询该商品库存,为0则活动结束。
int stockNum = stock.get(productId);
if(stockNum == 0) {
throw new SellException(100,"活动结束");
}else {
//2.下单(模拟不同用户openid不同)
orders.put(KeyUtil.genUniqueKey(),productId);
//3.减库存
stockNum =stockNum-1;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//4.更新库存
stock.put(productId,stockNum);
}
}
}
使用压测工具测试结果:雅诗兰黛小棕瓶特价,限量份100000 还剩:99938份 该商品成功下单用户数目:646人
很明显订单超卖了,如何解决?可以在秒杀方法上加上Synchronized,但是只适合单点服务器,且性能低
2. Redis分布式锁解决订单超卖
2.1 两个命令介绍
业务场景:
天猫双11热卖过程中,对已经售罄的货物追加补货,且补货完成。客户购买热情高涨, 3秒内将所有商品购买完毕。本次补货已经将库存全部清空,如何避免最后一件商品不被多人同时购买?【超卖问题】就是说如果只剩一件商品,但是有5个人要买,如何保证不被超卖???
业务分析:
使用watch监控一个key有没有改变已经不能解决问题,此处要监控的是具体数据 ,我们要监控的是商品的数量什么时候到1,这个商品的数量是一直变化的,不可能别每次变化,都放弃执行。
解决方案:
使用 setnx 设置一个公共锁:setnx lock-key value
,操作完毕通过del操作释放锁
利用setnx命令的返回值特征,有值则返回设置失败,无值则返回设置成功
对于返回设置成功的,拥有控制权进行下一步的具体业务操作,对于返回设置失败的,不具有控制权,排队或等待
127.0.0.1:6379> set num 10
OK
127.0.0.1:6379> setnx lock-num 1 -- 加锁
(integer) 1
127.0.0.1:6379> incrby num -1
(integer) 9
127.0.0.1:6379> del lock-num -- 释放锁
(integer) 1
127.0.0.1:6379> setnx lock-num 1 -- 当前客户端加锁
(integer) 1
127.0.0.1:6379> setnx lock-num 1 -- 其他客户端获取不到锁
(integer) 0
死锁: 如果加了锁,但是没有释放,就会导致死锁,其他客户端一直获取不到锁。
使用expire为锁key添加时间限定,到时间不释放,放弃锁
由于操作通常都是微秒或毫秒级,因此该锁定时间不宜设置过大。具体时间需要业务测试后确认。推荐:Java进阶视频资源
expire lock-key second pexpire lock-key milliseconds
127.0.0.1:6379> set name 123
OK
127.0.0.1:6379> setnx lock-name 1 -- 锁的名称key
(integer) 1
127.0.0.1:6379> expire lock-name 20 -- 使用expire为锁key添加时间限定
(integer) 1
127.0.0.1:6379> get name
"123"
GETSET key value
将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
redis> GETSET db mongodb # 没有旧值,返回 nil
(nil)
redis> GET db
"mongodb"
redis> GETSET db redis # 返回旧值 mongodb
"mongodb"
redis> GET db
"redis"
2.2 RedisLock
@Component
@Slf4j
public class RedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
* @param key:productId
* @param value 当前时间+超时时间
*/
public boolean lock(String key, String value) {
//setnx----对应方法 setIfAbsent(key, value),如果可以加锁返回true,不可以加锁返回false
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
//下面这段代码时为了解决可能出现的死锁情况
String currentValue = redisTemplate.opsForValue().get(key);
//如果锁过期
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//获取上一个锁的时间:重新设置锁的过期时间value,并返回上一个过期时间
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
//currentValue =2020-12-28,两个线程的value=2020-12-29,只会有一个线程拿到锁
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
return false;
}
//解锁
public void unlock(String key, String value) {
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
redisTemplate.opsForValue().getOperations().delete(key);
}
}catch (Exception e) {
log.error("【redis分布式锁】解锁异常, {}", e);
}
}
}
2.3 将redis分布式锁应用于秒杀业务
@Override
public void orderProductMockDiffUser(String productId) {
//加锁
//锁的过期时间为当前时间+过期时长
long time = System.currentTimeMillis()+TIMEOUT;
if(!redisLock.lock(productId,String.valueOf(time))){
throw new SellException(101,"人太多,稍后再来");
}
//1.查询该商品库存,为0则活动结束。
int stockNum = stock.get(productId);
if(stockNum == 0) {
throw new SellException(100,"活动结束");
}else {
//2.下单(模拟不同用户openid不同)
orders.put(KeyUtil.genUniqueKey(),productId);
//3.减库存
stockNum =stockNum-1;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//4.更新库存
stock.put(productId,stockNum);
}
//解锁
redisLock.unlock(productId,String.valueOf(time));
}
2.4 分析RedisLock
重点分析加锁逻辑,有两个逻辑需要考虑:
public boolean lock(String key, String value) {
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
//下面的代码是为了解决可能出现的死锁的情况????
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//下面这个逻辑又怎么理解????
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
return false;
}
为了解决可能出现的死锁的情况????
//秒杀业务方法
@Override
public void orderProductMockDiffUser(String productId) {
//加锁
//锁的过期时间为当前时间+过期时长
long time = System.currentTimeMillis()+TIMEOUT;
if(!redisLock.lock(productId,String.valueOf(time))){
throw new SellException(101,"人太多,稍后再来");
}
//1.查询该商品库存,为0则活动结束。
int stockNum = stock.get(productId);
if(stockNum == 0) {
throw new SellException(100,"活动结束");
}else {
//2.下单
orders.put(KeyUtil.genUniqueKey(),productId);
//3.减库存
stockNum =stockNum-1;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//4.更新库存
stock.put(productId,stockNum);
}
//解锁
redisLock.unlock(productId,String.valueOf(time));
}
假如我们将中间那段逻辑去掉会出现声明情况???
public boolean lock(String key, String value) {
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
return false;
}
① 线程A执行秒杀的业务逻辑方法,并对这个方法加了锁,key=proeuctId,value=加锁时间+过期时长,然后开始执行下单----》减库存-----》更新库存等操作,如果在执行的过程中,这段代码发生了异常,那么线程A是不会释放锁的,导致其他线程都无法获取到锁导致死锁的产生,所以下面的逻辑是很有必要加的,即如果当前时间晚于锁的过期时间,那么就会向下走if()条件:
//下面的代码是为了解决可能出现的死锁的情况????
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//下面这个逻辑又怎么理解????
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
② 可是下面的if()条件怎么理解?
currentValue=2020-12-18
假如现在两个线程A和B同时执行lock()方法,也就是这两个线程的value是完全相同的,都为value=2020-12-19,而他们都执行 String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
,会有一个先执行一个后执行:
假如线程A先执行,返回的oldValue=2020-12-18
,同时设置value = 2020-12-19
,由于oldvalue=currentValue
返回true,即A线程加了锁;
此时B线程继续执行 ,返回的oldValue=2020-12-19,oldvalue!=currentValue
,返回false,加锁失败
所以这段代码的逻辑是只会让一个线程加锁
public boolean lock(String key, String value) {
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//下面这个逻辑又怎么理解????
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
return false;
}
感谢阅读,希望对你有所帮助 :)
来源:hengheng.blog.csdn.net/article/details/107827649
最近给大家找了 通用权限系统
资源,怎么领取?
扫二维码,加我微信,回复:通用权限系统
注意,不要乱回复 没错,不是机器人 记得一定要等待,等待才有好东西