使用ZooKeeper实现分布式队列、分布式锁和选举详解
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
1、分布式队列
1)、offer方法
public class DistributedQueue {
    public boolean offer(byte[] data) throws KeeperException, InterruptedException {
        for (; ; ) {
            try {
                zookeeper.create(dir + "/" + prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
                return true;
            } catch (KeeperException.NoNodeException e) {
                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
            }
        }
    }
2)、element方法
public class DistributedQueue {
    public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {
        Map<Long, String> orderedChildren;
        while (true) {
            try {
               //获取所有排好序的子节点
                orderedChildren = orderedChildren(null);
            } catch (KeeperException.NoNodeException e) {
                throw new NoSuchElementException();
            }
            if (orderedChildren.size() == 0) {
                throw new NoSuchElementException();
            }
      //返回队头节点的数据
            for (String headNode : orderedChildren.values()) {
                if (headNode != null) {
                    try {
                        return zookeeper.getData(dir + "/" + headNode, false, null);
                    } catch (KeeperException.NoNodeException e) {
                       //另一个客户端已经移除了队头节点,尝试获取下一个节点
                    }
                }
            }
        }
    }
  
    private Map<Long, String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
        Map<Long, String> orderedChildren = new TreeMap<>();
        List<String> childNames;
        childNames = zookeeper.getChildren(dir, watcher);
        for (String childName : childNames) {
            try {
                if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
                    LOG.warn("Found child node with improper name: {}", childName);
                    continue;
                }
                String suffix = childName.substring(prefix.length());
                Long childId = Long.parseLong(suffix);
                orderedChildren.put(childId, childName);
            } catch (NumberFormatException e) {
                LOG.warn("Found child node with improper format : {}", childName, e);
            }
        }
        return orderedChildren;
    }  
3)、remove方法
public class DistributedQueue {
    public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
        Map<Long, String> orderedChildren;
        while (true) {
            try {
               //获取所有排好序的子节点
                orderedChildren = orderedChildren(null);
            } catch (KeeperException.NoNodeException e) {
                throw new NoSuchElementException();
            }
            if (orderedChildren.size() == 0) {
                throw new NoSuchElementException();
            }
      //移除队头节点
            for (String headNode : orderedChildren.values()) {
                String path = dir + "/" + headNode;
                try {
                    byte[] data = zookeeper.getData(path, false, null);
                    zookeeper.delete(path, -1);
                    return data;
                } catch (KeeperException.NoNodeException e) {
                    //另一个客户端已经移除了队头节点,尝试移除下一个节点
                }
            }
        }
    }
2、分布式锁
1)、排他锁
2)、羊群效应
3)、共享锁
4)、排他锁源码解析
public class WriteLock extends ProtocolSupport {
    public synchronized boolean lock() throws KeeperException, InterruptedException {
        if (isClosed()) {
            return false;
        }
       //确认持久父节点是否存在
        ensurePathExists(dir);
       //真正获取锁的逻辑 调用ProtocolSupport的retryOperation()方法
        return (Boolean) retryOperation(zop);
    }
class ProtocolSupport {
    protected Object retryOperation(ZooKeeperOperation operation)
        throws KeeperException, InterruptedException {
        KeeperException exception = null;
        for (int i = 0; i < RETRY_COUNT; i++) {
            try {
               //调用LockZooKeeperOperation的execute()方法
                return operation.execute();
            } catch (KeeperException.SessionExpiredException e) {
                LOG.warn("Session expired {}. Reconnecting...", zookeeper, e);
                throw e;
            } catch (KeeperException.ConnectionLossException e) {
                if (exception == null) {
                    exception = e;
                }
                LOG.debug("Attempt {} failed with connection loss. Reconnecting...", i);
                retryDelay(i);
            }
        }
        throw exception;
    }
public class WriteLock extends ProtocolSupport {
    private class LockZooKeeperOperation implements ZooKeeperOperation {
        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
            throws KeeperException, InterruptedException {
            List<String> names = zookeeper.getChildren(dir, false);
            for (String name : names) {
                if (name.startsWith(prefix)) {
                    id = name;
                    LOG.debug("Found id created last time: {}", id);
                    break;
                }
            }
            if (id == null) {
                id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);
                LOG.debug("Created id: {}", id);
            }
        }
        @SuppressFBWarnings(
            value = "NP_NULL_PARAM_DEREF_NONVIRTUAL",
            justification = "findPrefixInChildren will assign a value to this.id")
        public boolean execute() throws KeeperException, InterruptedException {
            do {
                if (id == null) {
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                   //创建临时顺序节点
                    findPrefixInChildren(prefix, zookeeper, dir);
                    idName = new ZNodeName(id);
                }
               //获取所有子节点
                List<String> names = zookeeper.getChildren(dir, false);
                if (names.isEmpty()) {
                    LOG.warn("No children in: {} when we've just created one! Lets recreate it...", dir);
                    id = null;
                } else {
                   //对所有子节点进行排序
                    SortedSet<ZNodeName> sortedNames = new TreeSet<>();
                    for (String name : names) {
                        sortedNames.add(new ZNodeName(dir + "/" + name));
                    }
                    ownerId = sortedNames.first().getName();
                    SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
                   //是否存在序号比自己小的节点
                    if (!lessThanMe.isEmpty()) {
                        ZNodeName lastChildName = lessThanMe.last();
                        lastChildId = lastChildName.getName();
                        LOG.debug("Watching less than me node: {}", lastChildId);
                       //有序号比自己小的节点,则调用exist()向前一个节点注册watcher
                        Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
                        if (stat != null) {
                            return Boolean.FALSE;
                        } else {
                            LOG.warn("Could not find the stats for less than me: {}", lastChildName.getName());
                        }
                    } 
                   //没有序号比自己小的节点,则获取锁
                   else {
                        if (isOwner()) {
                            LockListener lockListener = getLockListener();
                            if (lockListener != null) {
                                lockListener.lockAcquired();
                            }
                            return Boolean.TRUE;
                        }
                    }
                }
            }
            while (id == null);
            return Boolean.FALSE;
        }
public class WriteLock extends ProtocolSupport {
    public synchronized void unlock() throws RuntimeException {
        if (!isClosed() && id != null) {
            try {
        //删除当前节点,此时会触发后一个节点的watcher
                ZooKeeperOperation zopdel = () -> {
                    zookeeper.delete(id, -1);
                    return Boolean.TRUE;
                };
                zopdel.execute();
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
            } catch (KeeperException e) {
                LOG.warn("Unexpected exception", e);
                throw new RuntimeException(e.getMessage(), e);
            } finally {
                LockListener lockListener = getLockListener();
                if (lockListener != null) {
                    lockListener.lockReleased();
                }
                id = null;
            }
        }
    }
3、选举
public class LeaderElectionSupport implements Watcher {    
    public synchronized void start() {
        state = State.START;
        dispatchEvent(EventType.START);
        LOG.info("Starting leader election support");
        if (zooKeeper == null) {
            throw new IllegalStateException(
                "No instance of zookeeper provided. Hint: use setZooKeeper()");
        }
        if (hostName == null) {
            throw new IllegalStateException(
                "No hostname provided. Hint: use setHostName()");
        }
        try {
           //发起选举请求 创建临时顺序节点
            makeOffer();
           //选举请求是否被满足
            determineElectionStatus();
        } catch (KeeperException | InterruptedException e) {
            becomeFailed(e);
        }
    }
  
    private void makeOffer() throws KeeperException, InterruptedException {
        state = State.OFFER;
        dispatchEvent(EventType.OFFER_START);
        LeaderOffer newLeaderOffer = new LeaderOffer();
        byte[] hostnameBytes;
        synchronized (this) {
            newLeaderOffer.setHostName(hostName);
            hostnameBytes = hostName.getBytes();
            newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
                                                        hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                                        CreateMode.EPHEMERAL_SEQUENTIAL));
            leaderOffer = newLeaderOffer;
        }
        LOG.debug("Created leader offer {}", leaderOffer);
        dispatchEvent(EventType.OFFER_COMPLETE);
    }
  
    private void determineElectionStatus() throws KeeperException, InterruptedException {
        state = State.DETERMINE;
        dispatchEvent(EventType.DETERMINE_START);
        LeaderOffer currentLeaderOffer = getLeaderOffer();
        String[] components = currentLeaderOffer.getNodePath().split("/");
        currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1].substring("n_".length())));
    //获取所有子节点并排序
        List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));
        for (int i = 0; i < leaderOffers.size(); i++) {
            LeaderOffer leaderOffer = leaderOffers.get(i);
            if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {
                LOG.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i);
                dispatchEvent(EventType.DETERMINE_COMPLETE);
        
               //如果当前节点是第一个,则成为Leader
                if (i == 0) {
                    becomeLeader();
                } 
               //如果有选举请求在当前节点前面,则进行等待,调用exist()向前一个节点注册watcher
               else {
                    becomeReady(leaderOffers.get(i - 1));
                }
                break;
            }
        }
    }    
作者 | 邋遢的流浪剑客
来源 | csdn.net/qq_40378034/article/details/117014648

评论
















