分布式锁用Redis好?还是Zookeeper好?
点击上方“码农突围”,马上关注
这里是码农充电第一站,回复“666”,获取一份专属大礼包 真爱,请设置“星标”或点个“在看
Redis 实现 基于 Zookeeper 来实现分布式锁 总结
Redis 实现
SETNX key value
命令,意为 set if not exists
(如果不存在该 key,才去 set 值),就比如说是张三去上厕所,看厕所门锁着,他就不进去了,厕所门开着他才去。SETEX key seconds value
命令,为指定 key 设置过期时间,单位为秒。SET key value ex seconds nx
,加锁的同时设置过期时间。//基于jedis和lua脚本来实现
privatestaticfinal String LOCK_SUCCESS = "OK";
privatestaticfinal Long RELEASE_SUCCESS = 1L;
privatestaticfinal String SET_IF_NOT_EXIST = "NX";
privatestaticfinal String SET_WITH_EXPIRE_TIME = "PX";
@Override
public String acquire() {
try {
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
// 随机生成一个 value
String requireToken = UUID.randomUUID().toString();
while (System.currentTimeMillis() < end) {
String result = jedis
.set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return requireToken;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
log.error("acquire lock due to error", e);
}
returnnull;
}
@Override
public boolean release(String identify) {
if (identify == null) {
returnfalse;
}
//通过lua脚本进行比对删除操作,保证原子性
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = new Object();
try {
result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(identify));
if (RELEASE_SUCCESS.equals(result)) {
log.info("release lock success, requestToken:{}", identify);
returntrue;
}
} catch (Exception e) {
log.error("release lock due to error", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
log.info("release lock failed, requestToken:{}, result:{}", identify, result);
returnfalse;
}
思考:加锁和释放锁的原子性可以用 lua 脚本来保证,那锁的自动续期改如何实现呢?
org.redisson
redisson
3.13.4
private void test() {
//分布式锁名 锁的粒度越细,性能越好
RLock lock = redissonClient.getLock("test_lock");
lock.lock();
try {
//具体业务......
} finally {
lock.unlock();
}
}
// 最常见的使用方法
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
小结:虽然 lock() 有自动续锁机制,但是开发中还是推荐使用 lock(time,timeUnit)
,因为它省掉了整个续期带来的性能损,可以设置过期时间长一点,搭配unlock()
。
public void test() {
RLock lock = redissonClient.getLock("test_lock");
lock.lock(30, TimeUnit.SECONDS);
try {
//.......具体业务
} finally {
//手动释放锁
lock.unlock();
}
}
基于 Zookeeper 来实现分布式锁
create [-s] [-e] path [data]
命令,-s
为创建有序节点,-e
创建临时节点。ls [-w] path
为查看节点命令,-w
为添加一个 watch(监视器),/
为查看根节点所有节点,可以看到我们刚才所创建的节点,同时如果是跟着指定节点名字的话为查看指定节点下的子节点。当第一个线程进来时会去父节点上创建一个临时的顺序节点。 第二个线程进来发现锁已经被持有了,就会为当前持有锁的节点注册一个 watcher 监听器。 第三个线程进来发现锁已经被持有了,因为是顺序节点的缘故,就会为上一个节点去创建一个 watcher 监听器。 当第一个线程释放锁后,删除节点,由它的下一个节点去占有锁。
public class ZooKeeperDistributedLock implements Watcher {
private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.189.131:2181,192.168.189.132:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}
if (this.latch != null) {
this.latch.countDown();
}
}
public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
//获取锁
public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
Listlocks = zk.getChildren(locksRoot, false);
Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}
this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
//释放锁
public void unlock() {
try {
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
//异常
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}
总结
实现方式的不同,Redis 实现为去插入一条占位数据,而 ZK 实现为去注册一个临时节点。 遇到宕机情况时,Redis 需要等到过期时间到了后自动释放锁,而 ZK 因为是临时节点,在宕机时候已经是删除了节点去释放锁。 Redis 在没抢占到锁的情况下一般会去自旋获取锁,比较浪费性能,而 ZK 是通过注册监听器的方式获取锁,性能而言优于 Redis。
-End-
最近有一些小伙伴,让我帮忙找一些 面试题 资料,于是我翻遍了收藏的 5T 资料后,汇总整理出来,可以说是程序员面试必备!所有资料都整理到网盘了,欢迎下载!
点击👆卡片,关注后回复【面试题
】即可获取
评论