分布式锁的三种实现方式
共 7691字,需浏览 16分钟
·
2023-08-10 07:06
分布式锁的三种实现方式
为什么要使用分布式锁
使用分布式锁的目的,无外乎就是保证同一时间只有一个客户端可以对共享资源进行操作。
zookeeper可靠性比redis强太多,只是效率低了点,如果并发量不是特别大,追求可靠性,首选zookeeper。为了效率,则首选redis实现。
1.数据库方式
/**
* 该类实现Lock类,用于分布式锁(也可以不实现该类,自己添加lock和unlock方法)
* 实现思路:
* 1.在数据库中建立一张表,创建一个属性唯一的字段
* 2.当有用户访问时,先查询该字段是否有指定的值,
* 如果有则说明该数据正在被访问,拒绝其他用户进行写操作
* 如果没有则向数据库添加指定的值(上锁)
* 3.该用户用完需要的数据后删除表中添加的指定数据(解锁)
*/
public class MysqlLock implements Lock {
TestLockMapper testLockMapper;
private static final String FLAG = "isLock";
/**
* 上锁就是向数据库中加上一条数据,该字段设置为唯一
* 当其他人想上锁时发现该数据有人占用就会等待
*/
public void lock() {
while(true){ //循环直到获取到锁
boolean b = tryLock(); //如果锁被占用则获取不到
if(b){
TestLock testLock = new TestLock();
testLock.setFlag(FLAG);
testLockMapper.insert(testLock);
break;
}else {
System.out.println("该数据被占用...请等待。。。");
}
}
}
public void lockInterruptibly() throws InterruptedException {
}
/**
* 尝试获取锁,如果数据库中有该值,则获取不到锁
* @return
*/
public boolean tryLock() {
QueryWrapper<TestLock> wrapper = new QueryWrapper<>();
wrapper.eq("flag",FLAG);
TestLock testLock = testLockMapper.selectOne(wrapper);
if(testLock==null){
return true;
}
return false;
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
/**
* 释放锁,将上锁时设置的数据删除,其他请求可以设置数据来得到锁
*/
public void unlock() {
QueryWrapper<TestLock> wrapper = new QueryWrapper<>();
wrapper.eq("flag",FLAG);
testLockMapper.delete(wrapper);
}
public Condition newCondition() {
return null;
}
}
2.redis方式
死锁问题:当进行lock方法上锁以后,在执行操作时有异常而无法执行unlock释放锁
解决死锁:在redis中可以通过expire设置数据的过期时间,指定的时间内即使我们不释放锁也会自动删除
redis分布式锁实现基于setnt(set if not exists),设置成功返回1,失败返回0,释放锁通过del指令完成
/**
* 实现思路
* 基本和mysql一致
* 在redis中使用命令判断该值是否存在,存在则等待,不存在则设置值,获取锁
* 当操作完成后删除值并且释放锁
*/
public class RedisLock {
@Autowired
RedisTemplate redisTemplate;
private static final String NAME = "lock";
private static final String FLAG = "isLock";
public void lock(){
while (true){
//向redis中设置指定的key/value,
//该方法有可能导致死锁
// Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG);
//设置过期时间为 1 分钟 ,即使1分钟后没有人释放锁他也会自动消失
Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG,1, TimeUnit.MINUTES);
if (b){
return ;
}else{
System.out.println(" 该数据被占用...请等待。。。");
}
}
}
public void unlock(){
redisTemplate.delete(NAME);
}
}
3.zookeeper方式
zookeeper实现分布式锁:原理:有序临时节点+watch监听来实现
实现思路:为每一个执行的线程创建一个有序的临时节点,为了确保有序性,在创建完节点,会再获取全部节点,再重新进行—次排序,排序过程中,每个线程要判断自己剩下的临时节点的序号是否是显小的, 如果是最小的,将会获取到锁,执行相关操作,释放锁 如果不是最小的,会监听它的前一个节点,当它的前一个节点被删除时,它就会获得锁,依次类推
public class ZkLock {
//ZooKeeper客户端
private ZooKeeper zk;
//zk的一个目录结构locks,代表根目录
private String root ="/locks";
//锁的名称
private String lockName;
//当前线程创建的序列node
private ThreadLocal<String> nodeId = new ThreadLocal<>();
//用来同步等待zkclient链接到了服务端
//CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
private CountDownLatch countDownLatch = new CountDownLatch(1);
//超时时间
private static final int sessionTimeout = 3000;
private final static byte[] data = new byte[0];
//在构造方法中连接zookeeper,创建根节点lock
public ZkLock(String config,String lockName){
this.lockName = lockName;
try {
zk = new ZooKeeper(config, sessionTimeout, new Watcher() {//watcher用于监听
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected){
//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
countDownLatch.countDown();
}
}
});
//使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
countDownLatch.await();
Stat stat = zk.exists(root,false);
if(null==stat){
//创建根节点
zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 添加watch监听临时顺序节点的删除
*/
class LockWatcher implements Watcher {
private CountDownLatch latch = null;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
public void process(WatchedEvent event) {
if(event.getType()== Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
public void lock(){
try{
//创建临时子节点
String myNode = zk.create(root+"/"+lockName,data,ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+myNode+"created");
//取出所有子节点,并排序
List<String> subNode = zk.getChildren(root,false);
TreeSet<String> sortedNode = new TreeSet<>();
for (String node: subNode) {
sortedNode.add(root+"/"+node);
}
String smallNode = sortedNode.first();
if(myNode.equals(smallNode)){
//如果是最小节点,则表示获得锁
this.nodeId.set(myNode);
return;
}
String preNode = sortedNode.lower(myNode);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(preNode,new LockWatcher(latch));//同时注册监听
//判断比自己小一个数的节点是否存在,如果不存在则不等待锁,同时注册监听
if(stat != null){
latch.await();//等待其他线程释放锁
nodeId.set(myNode);
latch = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void unlock(){
try {
if(null!= nodeId){
zk.delete(nodeId.get(),-1);
}
nodeId.remove();//释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
}
关于Redundant declaration:@SpringBootApplication already applies given @ComponentScan异常
1.@ComponentScan默认扫描使用该注解的类所在的包,包括这个包下的类和子包,所以如果没有配置basepackages,并且类都放在子包中,是可以正常访问的2.如果配置了@ComponentScn中的basepackages,那么就要把所有需要扫描的包都配置.这种情况下,@ComponentScan是不会再去扫描当前类所在的包的.之前我之所以以为@ComponentScan对启动类之外的包无能为力,就是因为配置了domain包,但是没有配controller类的包,导致程序无法访问.