分布式锁的三种实现方式

JAVA不归路

共 7691字,需浏览 16分钟

 ·

2023-08-10 07:06

分布式锁的三种实现方式

为什么要使用分布式锁

使用分布式锁的目的,无外乎就是保证同一时间只有一个客户端可以对共享资源进行操作。

zookeeper可靠性比redis强太多,只是效率低了点,如果并发量不是特别大,追求可靠性,首选zookeeper。为了效率,则首选redis实现。

1.数据库方式

/** * 该类实现Lock类,用于分布式锁(也可以不实现该类,自己添加lock和unlock方法) * 实现思路: * 1.在数据库中建立一张表,创建一个属性唯一的字段 * 2.当有用户访问时,先查询该字段是否有指定的值, * 如果有则说明该数据正在被访问,拒绝其他用户进行写操作 * 如果没有则向数据库添加指定的值(上锁) * 3.该用户用完需要的数据后删除表中添加的指定数据(解锁) */public class MysqlLock implements Lock {    @Autowired    TestLockMapper testLockMapper;    private static final String FLAG = "isLock";
/** * 上锁就是向数据库中加上一条数据,该字段设置为唯一 * 当其他人想上锁时发现该数据有人占用就会等待 */ @Override public void lock() { while(true){ //循环直到获取到锁 boolean b = tryLock(); //如果锁被占用则获取不到 if(b){ TestLock testLock = new TestLock(); testLock.setFlag(FLAG); testLockMapper.insert(testLock); break; }else { System.out.println("该数据被占用...请等待。。。"); } } }
@Override public void lockInterruptibly() throws InterruptedException {
}
/** * 尝试获取锁,如果数据库中有该值,则获取不到锁 * @return */ @Override public boolean tryLock() { QueryWrapper<TestLock> wrapper = new QueryWrapper<>(); wrapper.eq("flag",FLAG); TestLock testLock = testLockMapper.selectOne(wrapper); if(testLock==null){ return true; } return false; }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; }
/** * 释放锁,将上锁时设置的数据删除,其他请求可以设置数据来得到锁 */ @Override public void unlock() { QueryWrapper<TestLock> wrapper = new QueryWrapper<>(); wrapper.eq("flag",FLAG); testLockMapper.delete(wrapper); }
@Override public Condition newCondition() { return null; }}

2.redis方式

死锁问题:当进行lock方法上锁以后,在执行操作时有异常而无法执行unlock释放锁

解决死锁:在redis中可以通过expire设置数据的过期时间,指定的时间内即使我们不释放锁也会自动删除

redis分布式锁实现基于setnt(set if not exists),设置成功返回1,失败返回0,释放锁通过del指令完成

/** * 实现思路 * 基本和mysql一致 * 在redis中使用命令判断该值是否存在,存在则等待,不存在则设置值,获取锁 * 当操作完成后删除值并且释放锁 */public class RedisLock {    @Autowired    RedisTemplate redisTemplate;    private static final String NAME = "lock";    private static final String FLAG = "isLock";    public void lock(){        while (true){        //向redis中设置指定的key/value,       //该方法有可能导致死锁        //    Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG);       //设置过期时间为 1 分钟 ,即使1分钟后没有人释放锁他也会自动消失            Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG,1, TimeUnit.MINUTES);            if (b){                return ;            }else{                System.out.println(" 该数据被占用...请等待。。。");            }        }    }
public void unlock(){ redisTemplate.delete(NAME); }
}

3.zookeeper方式

zookeeper实现分布式锁:原理:有序临时节点+watch监听来实现

实现思路:为每一个执行的线程创建一个有序的临时节点,为了确保有序性,在创建完节点,会再获取全部节点,再重新进行—次排序,排序过程中,每个线程要判断自己剩下的临时节点的序号是否是显小的, 如果是最小的,将会获取到锁,执行相关操作,释放锁 如果不是最小的,会监听它的前一个节点,当它的前一个节点被删除时,它就会获得锁,依次类推

public class ZkLock {    //ZooKeeper客户端    private ZooKeeper zk;    //zk的一个目录结构locks,代表根目录    private String root ="/locks";    //锁的名称    private String lockName;    //当前线程创建的序列node    private ThreadLocal<String> nodeId = new ThreadLocal<>();    //用来同步等待zkclient链接到了服务端   //CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待    private CountDownLatch countDownLatch = new CountDownLatch(1);    //超时时间    private static final int sessionTimeout = 3000;    private final static byte[] data = new byte[0];    //在构造方法中连接zookeeper,创建根节点lock    public  ZkLock(String config,String lockName){        this.lockName = lockName;        try {            zk = new ZooKeeper(config, sessionTimeout, new Watcher() {//watcher用于监听                @Override                public void process(WatchedEvent event) {                    if(event.getState()==Event.KeeperState.SyncConnected){                        //递减锁存器的计数,如果计数到达零,则释放所有等待的线程。                        countDownLatch.countDown();                    }                }            });            //使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。            countDownLatch.await();            Stat stat = zk.exists(root,false);            if(null==stat){                //创建根节点                zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            }        } catch (Exception e) {            e.printStackTrace();        }    }
/** * 添加watch监听临时顺序节点的删除 */ class LockWatcher implements Watcher { private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if(event.getType()== Event.EventType.NodeDeleted){ latch.countDown(); } } }
public void lock(){ try{ //创建临时子节点 String myNode = zk.create(root+"/"+lockName,data,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+myNode+"created"); //取出所有子节点,并排序 List<String> subNode = zk.getChildren(root,false); TreeSet<String> sortedNode = new TreeSet<>(); for (String node: subNode) { sortedNode.add(root+"/"+node); } String smallNode = sortedNode.first(); if(myNode.equals(smallNode)){ //如果是最小节点,则表示获得锁 this.nodeId.set(myNode); return; } String preNode = sortedNode.lower(myNode); CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode,new LockWatcher(latch));//同时注册监听 //判断比自己小一个数的节点是否存在,如果不存在则不等待锁,同时注册监听 if(stat != null){ latch.await();//等待其他线程释放锁 nodeId.set(myNode); latch = null; }
} catch (Exception e) { e.printStackTrace(); } }
public void unlock(){ try { if(null!= nodeId){ zk.delete(nodeId.get(),-1); } nodeId.remove();//释放锁 } catch (Exception e) { e.printStackTrace(); }
}
}

关于Redundant declaration:@SpringBootApplication already applies given @ComponentScan异常

1.@ComponentScan默认扫描使用该注解的类所在的包,包括这个包下的类和子包,所以如果没有配置basepackages,并且类都放在子包中,是可以正常访问的2.如果配置了@ComponentScn中的basepackages,那么就要把所有需要扫描的包都配置.这种情况下,@ComponentScan是不会再去扫描当前类所在的包的.之前我之所以以为@ComponentScan对启动类之外的包无能为力,就是因为配置了domain包,但是没有配controller类的包,导致程序无法访问.


浏览 52
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报