从零到一编码实现Redis分布式锁

共 7732字,需浏览 16分钟

 ·

2021-12-18 11:47

: 不是有redission等现成工具吗?咋不用?
: 不,我就想自己写一个!

陈建斌说 : 你这个男的怎么回事 ?!

有的同学,就是这么尿性。也能理解,不自己弄一下,怎么能理解透彻,那就一起来搞一下呗!

使用场景和选型

分布式多节点的部署方式,使得共享变量有可能被同时操作,遇到有数据一致性要求的情况,就需要采取全局锁定的措施来保障并发操作下的一致性要求,如,库存扣减操作、同一个商品的上下架和更新操作等等。

常见的,分布式锁采用ZookeeperRedis来实现。怎么取舍呢?


zookeeper

redis

加锁原理

创建节点,节点已存在时创建失败

插入数据,数据已存在则设置失败

过期保护

节点类型为临时节点,断连删除

设置过期时间,到期删除

优点

在加锁失败时,zk的注册通知更优雅

速度快,性能高

缺点

只有leader负责写,然后通知flower,性能较差

抢锁失败时,需要自旋循环尝试

生产环境下,性能往往被优先考虑,相比较各自的优缺点,综合考虑,我们一般更倾向于redis。

从0到1 实现分布式锁

step1: 加锁 和 解锁的基础能力构建

Jedis.set(key, value, params) 👏🏻
这个2.6之后新增的加强版set命令是真不错,解决了加锁时设置锁超时时间的原子诉求,防止服务宕机导致的死锁~

(1) 一个具有加锁解锁功能的分布式锁对象,最少要有 jedis客户端 、 对应的redis key 、 锁超时时间 :

//构建分布式锁对象
public class DistributedLock {
    private Jedis jedis;
    private String lockName;
    private long lockExpireSecond;

    public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond) {
        this.jedis = jedis;
        this.lockName = lockName;
        this.lockExpireSecond = lockExpireSecond;
    }
}

(2) 利用jedis提供的SetParams ,对NX , PX 在jedis.set操作中一次性的原子的完成设置:

public void lock() throws BizException {
    String lockResult = null;
    try {
       //设置 NX PX 参数
       SetParams params = new SetParams();
       params.nx();
       params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
       //执行加锁 , value 暂定 为固定字符串
       lockResult = this.jedis.set(this.lockName, "lockValue", params);

   } catch (Exception e) {
       LOG.error("lock error",e);
   }

   if ("OK".equals(lockResult)) {
       LOG.debug("locked success,lockName:{}",lockName);
   } else {
      throw new BizException("Get lock failed.");
   }
}

(3) 用jedis.del命令完成解锁:

 public boolean unlock() {
    boolean unlockResult=false;

   try {
       this.jedis.del(this.lockName);
       unlockResult=true;
   }catch (Exception e){
      LOG.error("unLock error",e);
   }
    return unlockResult;
}

step2: 加锁失败直接结束? 希望多试几次

从上面的构造函数和lock()实现,发现当前实现属于一锤子买卖,不成功便成仁。这其实不太满足我们的生产需求,很多场景下,业务执行速度是很快的,只要稍微等一等,就可以。那怎么办?

自定义重试次数和等待间隔,有限重试等待

//新增重试间隔属性
private long retryIntervalTime; 

//通过构造方法初始化重试间隔
public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond, long retryIntervalTime) {
   ...略
   this.retryIntervalTime = retryIntervalTime;
}

//新增入参,加锁超时时间
public void lock(long timeout,TimeUnit unit) throws TimeoutException {
   String lockResult = null;
   try {
       //设置 NX PX 参数
       SetParams params = new SetParams();
       params.nx();
       params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
            
      //加锁开始时间
      long startTime=System.nanoTime();
            
      //循环有限等待
      while (!"OK".equals(lockResult=this.jedis.set(this.lockName, "lockValue", params))&&!isTimeout(startTime,unit.toNanos(timeout))){
           Thread.sleep(retryIntervalTime);
     }

  } catch (Exception e) {
      LOG.error("lock error",e);
  }
        
  //修改抛出异常类型为超时异常
  if ("OK".equals(lockResult)) {
       LOG.debug("locked success,lockName:{}",lockName);
  } else {
      throw new TimeoutException("Get lock failed because of timeout.");
  }
}

step3: 只能解自己加的锁,别人的锁不能乱动

考虑一个问题:我们为了防止加锁后机器宕机的情况,给锁设置了过期时间,以此来保障锁可以在服务节点宕机不能解锁时,也可以给后续业务提供锁操作。

参考《How to do distributed locking》

上图中,因为业务执行时间的不可控(或者遇到GC等不可预期的停顿),给分布式锁带来了使用问题。

我们先看问题一:用户线程1 把 线程2的锁释放了!怎么办呢?

加锁保存线程标识,解锁校验,非自己的锁不释放

//其他属性略,新增lockOwner标识
private String lockOwner;

//通过构造函数初始化lockOwner标识 
public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime) {
    ...略
    this.lockOwner = lockOwner;
}

public void lock(long timeout,TimeUnit unit) throws TimeoutException {
   String lockResult = null;
   try {
      //设置 NX PX 参数
      SetParams params = new SetParams();
      params.nx();
      params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
            
      //加锁开始时间
      long startTime=System.nanoTime();
            
      // set时的value 改为 lockOwner
     while (!"OK".equals(lockResult=this.jedis.set(this.lockName, this.lockOwner, params))&&!isTimeout(startTime,unit.toNanos(timeout))){
         Thread.sleep(retryIntervalTime);
      }
   } catch (Exception e) {
       LOG.error("lock error",e);
   }
   ...略
}
    
public boolean unlock() {
    boolean unlockResult=false;
    try {
       // 先getValue ,并和当前lockOwner匹配,匹配上才去解锁
        if (this.lockOwner.equals(this.jedis.get(this.lockName))) {
           this.jedis.del(this.lockName);
           unlockResult = true;
       }
   }catch (Exception e){
        LOG.error("unLock error",e);
  }
    return unlockResult;
}

有的同学说,这个解锁的地方,需要用lua包成原子操作。单从功能上来讲,上面的实现也是OK的,因为只有get到的结果和本身匹配,才会进行下述操作。包成lua脚本的目的,应该主要是为了减少一次传输,提高执行效率。

step4: expire时间不够产生并发冲突

也就是之前的图中的问题二:线程1 还在执行中,锁就过期释放了,导致线程2也加锁成功,这直接导致了线程间的业务冲突。怎么办呢?

锁持有期内,根据需要,动态延长锁的过期时间

触发锁延期的方案选型,也是个大事,jdk原生timer调度线程池nettyTimer都可以实现,选哪个好?

综合对比精度、资源消耗等方面,Netty中采用时间轮算法的Timer应该是首选,都能管理成千上万的连接、调度心跳检测,拿来搞个锁延期还不是手拿把掐?


首先,需要构建一个全局的Timer来存储和调度任务其次,在加锁成功之后添加定时触发任务再次,延期操作时,需要校验当前线程是否还持有锁最后,在解锁时,取消定时任务注意点,任务需要循环注册 , 考虑线程被中断的情况


构建分布式锁上下文,用于存储全局时间轮调度器:

public class LockContext {

    private HashedWheelTimer timer;

    private LockContext(){
        //时间轮参数可以从业务自己的配置获取
        // long tickDuration=(Long) config.get("tickDuration");
        // int tickPerWheel=(int) config.get("tickPerWheel"); //默认1024
        // boolean leakDetection=(Boolean)config.get("leakDetection");
        timer = new HashedWheelTimer(new DefaultThreadFactory("distributedLock-timer",true), 10, TimeUnit.MILLISECONDS, 1024false);
    }

通过构造函数,将上下文及调度器传入分布式锁对象:

public class DistributedLock {
    //上下文
    private LockContext context;
    //当前持有的 Timer 调度对象
    private volatile Timeout  lockTimeout;

    public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime, LockContext context) {
         ...其他属性略
        this.context = context;
    }

加锁成功之后,执行调度器注册操作:

public void lock(long timeout, TimeUnit unit) throws TimeoutException {
    //...加锁 略
    
   if ("OK".equals(lockResult)) {
       LOGGER.info("locked success,lockName:{}",lockName);
       try {
           //注册循环延期事件
           registerLoopReExpire();
       }finally {
           if (Thread.currentThread().isInterrupted()&&this.lockTimeout!=null){
               LOGGER.warn("线程中断,定时任务取消");
               this.lockTimeout.cancel();
           }
       }
   } else {
       throw new TimeoutException("Get lock failed because of timeout.");
    }
}

方法registerLoopReExpire()中是实际的任务注册和延期操作:

private void registerLoopReExpire() {
    LOGGER.info("分布式锁延期任务注册");

    //每次注册,都把timeout赋给当前锁对象,用于后续解锁时取消
    this.lockTimeout = context.getTimer().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
        
            //校验是否还在持有锁,并延长过期时间
            boolean isReExpired=reExpireLock(lockName,lockOwner);
        
            if (isReExpired) {
                //自己调自己,循环注册
                registerLoopReExpire();
            }else {
                lockTimeout.cancel();
            }
        }
    }, TimeUnit.SECONDS.toMillis( lockExpireSecond)/2, TimeUnit.MILLISECONDS);

    LOGGER.info("分布式锁延期任务注册完成");
}

这里有几个点需要重点关注:
newTimeout()操作会返回一个Timeout实体,我们需要依赖该实体来对当前任务进行管理,所以需要赋值给锁内部对象。
锁延期,需要根据lockOwner 和 lockName来判断,持有锁才加锁,需要使用lua方式来保证判断和执行的原子性。
执行完延期操作之后,需要根据结果进行后续处理,成功则继续注册,失败则取消当前任务。定时任务的执行时间,应该要小于锁的过期时间,取过期时间的1/21/3或自定义传入。
来验证一下,我们设置 锁的过期时间为3秒,业务执行时间为10秒 ,执行:
可以看到,定时任务一共延期了6次,最后一次注册成功了,但是业务执行完随着解锁任务取消了。

总结和回顾

本文,我们从0到1的对分布式锁进行了编码实现。从基本能力,最后到生产环境的各种诉求基本都进行了填充和完善。
值得一提的是,除了延期功能,上述大部分能力都是经历过生产环境考验的。大家如果发现了延期功能的实现有什么问题,欢迎留言纠正,一起讨论进步。
当然,上述内容还有缺失,比如jedis 操作lua脚本的延期实现,可重入锁的改造,由于篇幅原因就不都贴出来了,有兴趣的同学也可以按上述思路继续完善。
另外,我们的上述实现都是基于主从架构,因此,分布式锁有可能会在主从切换或者其他宕机场景出现异常,但是个人认为,用牺牲效率来保障稳定的redLock,在大部分场景下其实没有什么必要。关于这部分,其实有几个号主讲述的非常到位,可以搜来看看。
最后,当我们对照redisson的分布式锁实现,来回看我们自己的实现,其实就会发现,在主逻辑上的实现,其实是大同小异的,只是redission的在可重入、效率兼顾(netty架构的运用)等方面要更加完备。
纸上得来终觉浅~ 共勉~

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号


好文章,我在看❤️

浏览 40
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报