解读源码系列——源码级图文详解Zookeeper选举机制

大数据那些事

共 32592字,需浏览 66分钟

 ·

2021-09-06 23:55

点击上方蓝字
关注我吧


1

Zookeeper选举概述


SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。

ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。

Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

当第一次启动时,Zookeeper的选举机制如下图:

(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;

(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;

(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

(5)服务器5启动,同4一样当小弟。


当非第一次启动时,选举过程如下所示:

(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

•服务器初始化启动。

•服务器运行期间无法和Leader保持连接。

(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

•集群中本来就已经存在一个Leader。

对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。

•集群中确实不存在Leader。

假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

SID为1、2、4的机器投票情况:

EPOCHZXIDSID

EPOCHZXIDSID

EPOCHZXIDSID
181

182174

选举Leader规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出


ZK选举流程分析



2

选举准备

选举准备源码概览:

QuorumPeer.java

public synchronized void start() {    if (!getView().containsKey(myid)) {        throw new RuntimeException("My id" + myid + " not in the peer list");     }    loadDataBase();    startServerCnxnFactory();    try {        adminServer.start();    } catch (AdminServerException e) {        LOG.warn("Problem startingAdminServer", e);        System.out.println(e);    }       // 选举准备    startLeaderElection();    super.start();} synchronized public void startLeaderElection() {   try {       if (getPeerState() ==ServerState.LOOKING) {                //  创建选票                // (1)选票组件:epoch(leader的任期代号)、zxid(某个leader当选期间执行的事务编号)、myid(serverid)                // (2)开始选票时,都是先投自己           currentVote = new Vote(myid,getLastLoggedZxid(), getCurrentEpoch());       }   } catch(IOException e) {       RuntimeException re = newRuntimeException(e.getMessage());       re.setStackTrace(e.getStackTrace());       throw re;   }    // if (!getView().containsKey(myid)) {  //     throw new RuntimeException("My id " + myid + " not in thepeer list");    //}    if (electionType == 0) {        try {            udpSocket = new DatagramSocket(getQuorumAddress().getPort());            responder = new ResponderThread();            responder.start();        } catch (SocketException e) {            throw new RuntimeException(e);        }    }       // 创建选举算法实例    this.electionAlg = createElectionAlgorithm(electionType);} protectedElection createElectionAlgorithm(intelectionAlgorithm){    Election le=null;     //TODO: use a factory rather than a switch    switch (electionAlgorithm) {    case 0:        le = new LeaderElection(this);        break;    case 1:        le = new AuthFastLeaderElection(this);        break;    case 2:        le = new AuthFastLeaderElection(this,true);        break;    case 3:              // 1创建QuorumCnxnManager,负责选举过程中的所有网络通信        QuorumCnxManager qcm = createCnxnManager();        QuorumCnxManager oldQcm =qcmRef.getAndSet(qcm);                     if (oldQcm != null) {            LOG.warn("Clobberingalready-set QuorumCnxManager (restarting leader election?)");            oldQcm.halt();        }        QuorumCnxManager.Listener listener = qcm.listener;        if(listener != null){                     // 2启动监听线程            listener.start();                                         // 3准备开始选举            FastLeaderElection fle = new FastLeaderElection(this,qcm);            fle.start();            le = fle;        } else {            LOG.error("Null listener wheninitializing cnx manager");        }        break;    default:        assert false;    }    return le;}


1)网络通信组件初始化

public QuorumCnxManager createCnxnManager() {    return new QuorumCnxManager(this,            this.getId(),            this.getView(),            this.authServer,            this.authLearner,            this.tickTime * this.syncLimit,            this.getQuorumListenOnAllIPs(),            this.quorumCnxnThreadsSize,            this.isQuorumSaslAuthEnabled());} public QuorumCnxManager(QuorumPeerself,                        final long mySid,                        Map<Long,QuorumPeer.QuorumServer> view,                        QuorumAuthServer authServer,                        QuorumAuthLearner authLearner,                        int socketTimeout,                        boolean listenOnAllIPs,                        int quorumCnxnThreadsSize,                        boolean quorumSaslAuthEnabled) {       // 创建各种队列                                  this.recvQueue= new ArrayBlockingQueue<Message>(RECV_CAPACITY);    this.queueSendMap= new ConcurrentHashMap<Long,ArrayBlockingQueue<ByteBuffer>>();    this.senderWorkerMap= new ConcurrentHashMap<Long,SendWorker>();    this.lastMessageSent= new ConcurrentHashMap<Long,ByteBuffer>();     String cnxToValue =System.getProperty("zookeeper.cnxTimeout");    if(cnxToValue != null){        this.cnxTO =Integer.parseInt(cnxToValue);    }     this.self = self;     this.mySid = mySid;    this.socketTimeout = socketTimeout;    this.view = view;    this.listenOnAllIPs = listenOnAllIPs;     initializeAuth(mySid, authServer, authLearner,quorumCnxnThreadsSize,            quorumSaslAuthEnabled);     // Starts listener thread that waits forconnection requests    listener = new Listener();   listener.setName("QuorumPeerListener");}


2)监听线程初始化

点击QuorumCnxManager.Listener,找到对应的run方法

public void run() {    int numRetries = 0;    InetSocketAddress addr;    Socket client = null;    Exception exitException = null;          while ((!shutdown) &&(portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {        try {            if(self.shouldUsePortUnification()) {                LOG.info("CreatingTLS-enabled quorum server socket");                ss = newUnifiedServerSocket(self.getX509Util(), true);            } else if (self.isSslQuorum()) {                LOG.info("CreatingTLS-only quorum server socket");                ss = newUnifiedServerSocket(self.getX509Util(), false);            } else {                ss = new ServerSocket();            }             ss.setReuseAddress(true);             if (self.getQuorumListenOnAllIPs()){                int port =self.getElectionAddress().getPort();                addr = newInetSocketAddress(port);            } else {                // Resolve hostname for thisserver in case the                // underlying ip address haschanged.               self.recreateSocketAddresses(self.getId());                addr =self.getElectionAddress();            }            LOG.info("My election bindport: " + addr.toString());            setName(addr.toString());                     // 绑定服务器地址            ss.bind(addr);                     // 死循环            while (!shutdown) {                try {                                   // 阻塞,等待处理请求                    client = ss.accept();                                                      setSockOpts(client);                    LOG.info("Receivedconnection request "                            +formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));                    // Receive and handle theconnection request                    // asynchronously if thequorum sasl authentication is                    // enabled. This isrequired because sasl server                    // authentication processmay take few seconds to finish,                    // this may delay next peer connectionrequests.                    if (quorumSaslAuthEnabled){                       receiveConnectionAsync(client);                    } else {                       receiveConnection(client);                    }                    numRetries = 0;                } catch (SocketTimeoutExceptione) {                    LOG.warn("The socketis listening for the election accepted "                             + "and ittimed out unexpectedly, but will retry."                             + "seeZOOKEEPER-2836");                }            }        } catch (IOException e) {            ... ...            closeSocket(client);        }    }    ... ..

.

}

3)选举准备

点击FastLeaderElection

public FastLeaderElection(QuorumPeerself, QuorumCnxManager manager){    this.stop = false;    this.manager = manager;     starter(self, manager);} private void starter(QuorumPeer self,QuorumCnxManager manager) {    this.self = self;    proposedLeader = -1;    proposedZxid = -1;             // 初始化队列和信息    sendqueue = new LinkedBlockingQueue<ToSend>();    recvqueue = new LinkedBlockingQueue<Notification>();    this.messenger= new Messenger(manager);}




3

选举执行

选举执行源码概览:


QuorumPeer.java

public synchronized void start() {    if (!getView().containsKey(myid)) {        throw new RuntimeException("My id" + myid + " not in the peer list");    }       // 冷启动数据恢复    loadDataBase();    startServerCnxnFactory();    try {              // 启动通信工厂实例对象        adminServer.start();    } catch (AdminServerException e) {        LOG.warn("Problem startingAdminServer", e);        System.out.println(e);    }       // 准备选举环境    startLeaderElection();       // 执行选举    super.start();}


1)执行super.start();就相当于执行QuorumPeer.java类中的run()方法

当Zookeeper启动后,首先都是Looking状态,通过选举,让其中一台服务器成为Leader,其他的服务器成为Follower。

QuorumPeer.java

public void run() {    updateThreadName();     LOG.debug("Starting quorumpeer");    try {        jmxQuorumBean = new QuorumBean(this);       MBeanRegistry.getInstance().register(jmxQuorumBean, null);        for(QuorumServer s:getView().values()){            ZKMBeanInfo p;            if (getId() == s.id) {                p = jmxLocalPeerBean = new LocalPeerBean(this);                try {                   MBeanRegistry.getInstance().register(p, jmxQuorumBean);                } catch (Exception e) {                    LOG.warn("Failed to register withJMX", e);                    jmxLocalPeerBean = null;                }            } else {                RemotePeerBean rBean = new RemotePeerBean(this, s);                try {                    MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);                    jmxRemotePeerBean.put(s.id, rBean);                } catch (Exception e) {                    LOG.warn("Failed toregister with JMX", e);                }            }        }    } catch (Exception e) {        LOG.warn("Failed to register withJMX", e);        jmxQuorumBean = null;    }     try {        /*         * Main loop         */        while (running) {            switch (getPeerState()) {            case LOOKING:                LOG.info("LOOKING");                 if(Boolean.getBoolean("readonlymode.enabled")) {                    LOG.info("Attemptingto start ReadOnlyZooKeeperServer");                     // Create read-only serverbut don't start it immediately                    final ReadOnlyZooKeeperServer roZk =                        new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);                     // Instead of starting roZkimmediately, wait some grace                    // period before we decidewe're partitioned.                    //                    // Thread is used herebecause otherwise it would require                    // changes in each ofelection strategy classes which is                    // unnecessary codecoupling.                    Thread roZkMgr = newThread() {                        public void run() {                            try {                                // lower-boundgrace period to 2 secs                               sleep(Math.max(2000, tickTime));                                if(ServerState.LOOKING.equals(getPeerState())) {                                   roZk.startup();                                }                            } catch(InterruptedException e) {                               LOG.info("Interrupted while attempting to startReadOnlyZooKeeperServer, not started");                            } catch (Exceptione) {                               LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);                            }                        }                    };                    try {                        roZkMgr.start();                        reconfigFlagClear();                        if (shuttingDownLE) {                            shuttingDownLE =false;                           startLeaderElection();                        }                        // 进行选举,选举结束,返回最终成为Leader胜选的那张选票                        setCurrentVote(makeLEStrategy().lookForLeader());                    } catch (Exception e) {                       LOG.warn("Unexpected exception", e);                       setPeerState(ServerState.LOOKING);                    } finally {                        // If the thread is inthe the grace period, interrupt                        // to come out ofwaiting.                        roZkMgr.interrupt();                        roZk.shutdown();                    }                } else {                    try {                       reconfigFlagClear();                        if (shuttingDownLE) {                           shuttingDownLE =false;                          startLeaderElection();                           }                       setCurrentVote(makeLEStrategy().lookForLeader());                    } catch (Exception e) {                       LOG.warn("Unexpected exception", e);                       setPeerState(ServerState.LOOKING);                    }                                       }                break;            case OBSERVING:                try {                   LOG.info("OBSERVING");                    setObserver(makeObserver(logFactory));                    observer.observeLeader();                } catch (Exception e) {                    LOG.warn("Unexpectedexception",e );                } finally {                    observer.shutdown();                    setObserver(null);                    updateServerState();                }                break;            case FOLLOWING:                try {                  LOG.info("FOLLOWING");                    setFollower(makeFollower(logFactory));                    follower.followLeader();                } catch (Exception e) {                   LOG.warn("Unexpectedexception",e);                } finally {                   follower.shutdown();                   setFollower(null);                   updateServerState();                }                break;            case LEADING:                LOG.info("LEADING");                try {                    setLeader(makeLeader(logFactory));                    leader.lead();                    setLeader(null);                } catch (Exception e) {                    LOG.warn("Unexpectedexception",e);                } finally {                    if (leader != null) {                       leader.shutdown("Forcing shutdown");                        setLeader(null);                    }                    updateServerState();                }                break;            }            start_fle =Time.currentElapsedTime();        }    } finally {        ... ...    }}


2)ctrl+alt+b点击lookForLeader()的实现类FastLeaderElection.java

public Vote lookForLeader()throws InterruptedException {    try {        self.jmxLeaderElectionBean = newLeaderElectionBean();        MBeanRegistry.getInstance().register(                self.jmxLeaderElectionBean,self.jmxLocalPeerBean);    } catch (Exception e) {        LOG.warn("Failed to register withJMX", e);        self.jmxLeaderElectionBean = null;    }    if (self.start_fle == 0) {       self.start_fle =Time.currentElapsedTime();    }    try {              // 正常启动中,所有其他服务器,都会给我发送一个投票              // 保存每一个服务器的最新合法有效的投票        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();                           // 存储合法选举之外的投票结果        HashMap<Long, Vote> outofelection= new HashMap<Long, Vote>();               // 一次选举的最大等待时间,默认值是0.2s        int notTimeout = finalizeWait;                           // 每发起一轮选举,logicalclock++              // 在没有合法的epoch数据之前,都使用逻辑时钟代替              // 选举leader的规则:依次比较 epoch(任期)  zxid(事务id)  serverid(myid)  谁大谁当选leader        synchronized(this){                     // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟                     // logicalclock = epoch            logicalclock.incrementAndGet();                     // 更新选票(serverid,zxid, epoch),            updateProposal(getInitId(), getInitLastLoggedZxid(),getPeerEpoch());        }         LOG.info("New election. My id=  " + self.getId() +                ", proposed zxid=0x"+ Long.toHexString(proposedZxid));              // 广播选票,把自己的选票发给其他服务器        sendNotifications();         /*         * Loop in which we exchangenotifications until we find a leader         */              // 一轮一轮的选举直到选举成功        while ((self.getPeerState() ==ServerState.LOOKING) &&                (!stop)){            … …        }        return null;    } finally {        … …    }}


3)点击sendNotifications,广播选票,把自己的选票发给其他服务器

private void sendNotifications() {       // 遍历投票参与者,给每台服务器发送选票    for (long sid :self.getCurrentAndNextConfigVoters()) {        QuorumVerifier qv =self.getQuorumVerifier();              // 创建发送选票        ToSend notmsg = new ToSend(ToSend.mType.notification,                proposedLeader,                proposedZxid,                logicalclock.get(),                QuorumPeer.ServerState.LOOKING,                sid,                proposedEpoch,qv.toString().getBytes());                                   if(LOG.isDebugEnabled()){            LOG.debug("SendingNotification: " + proposedLeader + " (n.leader), 0x"  +                 Long.toHexString(proposedZxid) + " (n.zxid), 0x" +Long.toHexString(logicalclock.get())  +                  " (n.round), " +sid + " (recipient), " + self.getId() +                  " (myid), 0x" +Long.toHexString(proposedEpoch) + " (n.peerEpoch)");        }              // 把发送选票放入发送队列        sendqueue.offer(notmsg);    }}


4)在FastLeaderElection.java类中查找WorkerSender线程。

class WorkerSender extends ZooKeeperThread {    volatile boolean stop;    QuorumCnxManager manager;     WorkerSender(QuorumCnxManager manager){        super("WorkerSender");        this.stop = false;        this.manager = manager;    }     public void run() {        while (!stop) {            try {                            // 队列阻塞,时刻准备接收要发送的选票                ToSend m = sendqueue.poll(3000,TimeUnit.MILLISECONDS);                if(m == null) continue;                            // 处理要发送的选票                process(m);            } catch (InterruptedException e) {               break;            }        }        LOG.info("WorkerSender isdown");    }     /**     * Called by run() once there is a newmessage to send.     * @param m     message to send     */    void process(ToSend m) {        ByteBuffer requestBuffer =buildMsg(m.state.ordinal(),                                           m.leader,                                           m.zxid,                                           m.electionEpoch,                                            m.peerEpoch,                                           m.configData);              // 发送选票        manager.toSend(m.sid, requestBuffer);    }}  public void toSend(Long sid,ByteBuffer b) {    /*     * If sending message to myself, thensimply enqueue it (loopback).     */       // 判断如果是发给自己的消息,直接进入自己的RecvQueue    if (this.mySid == sid) {         b.position(0);         addToRecvQueue(new Message(b.duplicate(), sid));        /*         * Otherwise send to the correspondingthread to send.         */    } else {         /*          * Start a new connection if doesn'thave one already.          */               // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列               // ,并把要发送的消息放入该队列         ArrayBlockingQueue<ByteBuffer>bq = new ArrayBlockingQueue<ByteBuffer>(            SEND_CAPACITY);         ArrayBlockingQueue<ByteBuffer>oldq = queueSendMap.putIfAbsent(sid, bq);         if (oldq != null) {             addToSendQueue(oldq, b);         } else {             addToSendQueue(bq, b);         }               // 将选票发送出去         connectOne(sid);    }}


5)如果数据是发送给自己的,添加到自己的接收队列

public void addToRecvQueue(Messagemsg) {    synchronized(recvQLock) {        if (recvQueue.remainingCapacity() == 0){            try {                recvQueue.remove();            } catch (NoSuchElementException ne){                // element could be removed bypoll()                 LOG.debug("Trying toremove from an empty " +                     "recvQueue. Ignoringexception " + ne);            }        }        try {                     // 将发送给自己的选票添加到recvQueue队列            recvQueue.add(msg);        } catch (IllegalStateException ie) {            // This should never happen            LOG.error("Unable to insert elementin the recvQueue " + ie);        }    }}    

    

 

6)数据添加到发送队列

private void addToSendQueue(ArrayBlockingQueue<ByteBuffer>queue,     ByteBuffer buffer) {    if(queue.remainingCapacity() == 0) {        try {           queue.remove();        } catch(NoSuchElementException ne) {            //element could be removed by poll()           LOG.debug("Trying to remove from an empty " +                   "Queue. Ignoring exception " + ne);        }    }    try {              //将要发送的消息添加到发送队列        queue.add(buffer);    } catch(IllegalStateException ie) {        // Thisshould never happen       LOG.error("Unable to insert an element in the queue " + ie);    }}


7)与要发送的服务器节点建立通信连接

synchronized void connectOne(long sid){    if(senderWorkerMap.get(sid) != null) {       LOG.debug("There is a connection already for server " + sid);        return;    }   synchronized (self.QV_LOCK) {        boolean knownId = false;        //Resolve hostname for the remote server before attempting to        //connect in case the underlying ip address has changed.       self.recreateSocketAddresses(sid);       Map<Long, QuorumPeer.QuorumServer> lastCommittedView =self.getView();       QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();       Map<Long, QuorumPeer.QuorumServer> lastProposedView =lastSeenQV.getAllMembers();                     if(lastCommittedView.containsKey(sid)) {           knownId = true;            if(connectOne(sid,lastCommittedView.get(sid).electionAddr))               return;        }        if(lastSeenQV != null && lastProposedView.containsKey(sid)               && (!knownId || (lastProposedView.get(sid).electionAddr !=               lastCommittedView.get(sid).electionAddr))) {           knownId = true;            if(connectOne(sid,lastProposedView.get(sid).electionAddr))               return;        }        if(!knownId) {           LOG.warn("Invalid server id: " + sid);           return;        }    }}  synchronized private boolean connectOne(long sid, InetSocketAddresselectionAddr){    if(senderWorkerMap.get(sid) != null) {       LOG.debug("There is a connection already for server " + sid);        returntrue;    }     Socket sock= null;    try {       LOG.debug("Opening channel to server " + sid);        if(self.isSslQuorum()) {            SSLSocket sslSock = self.getX509Util().createSSLSocket();            setSockOpts(sslSock);            sslSock.connect(electionAddr, cnxTO);            sslSock.startHandshake();            sock = sslSock;            LOG.info("SSL handshake complete with {} - {} - {}",sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(),sslSock.getSession().getCipherSuite());         } else{            sock = new Socket();            setSockOpts(sock);             sock.connect(electionAddr, cnxTO);          }        LOG.debug("Connected to server " + sid);        //Sends connection request asynchronously if the quorum        // saslauthentication is enabled. This is required because        // saslserver authentication process may take few seconds to        //finish, this may delay next peer connection requests.        if(quorumSaslAuthEnabled) {            initiateConnectionAsync(sock, sid);        } else{                     //处理连接            initiateConnection(sock,sid);        }        returntrue;    } catch(UnresolvedAddressException e) {        ... ...    }} public void initiateConnection(final Socket sock, final Long sid) {    try {        startConnection(sock,sid);    } catch(IOException e) {       LOG.error("Exception while connecting, id: {}, addr: {}, closinglearner connection",               new Object[] { sid, sock.getRemoteSocketAddress() }, e);       closeSocket(sock);        return;    }}


8)创建并启动发送器线程和接收器线程

private boolean startConnection(Socket sock, Long sid)        throws IOException {   DataOutputStream dout = null;   DataInputStream din = null;    try {        // UseBufferedOutputStream to reduce the number of IP packets. This is        //important for x-DC scenarios.              //通过输出流,向服务器发送数据       BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());        dout = newDataOutputStream(buf);         //Sending id and challenge        //represents protocol version (in other words - message type)       dout.writeLong(PROTOCOL_VERSION);       dout.writeLong(self.getId());        String addr = formatInetAddr(self.getElectionAddress());        byte[] addr_bytes = addr.getBytes();       dout.writeInt(addr_bytes.length);       dout.write(addr_bytes);       dout.flush();              //通过输入流读取对方发送过来的选票        din = new DataInputStream(                newBufferedInputStream(sock.getInputStream()));    } catch(IOException e) {       LOG.warn("Ignoring exception reading or writing challenge: ",e);       closeSocket(sock);        returnfalse;    }     // authenticatelearner   QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);    if (qps !=null) {        // TODO- investigate why reconfig makes qps null.       authLearner.authenticate(sock, qps.hostname);    }     // If lostthe challenge, then drop the new connection           //如果对方的id比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端    if (sid > self.getId()) {       LOG.info("Have smaller server identifier, so dropping the " +               "connection: (" + sid + ", " + self.getId() +")");       closeSocket(sock);        //Otherwise proceed with the connection    } else {              //初始化,发送器和接收器        SendWorker sw = new SendWorker(sock,sid);        RecvWorker rw = new RecvWorker(sock,din, sid, sw);       sw.setRecv(rw);        SendWorker vsw = senderWorkerMap.get(sid);         if(vsw!= null)           vsw.finish();                    senderWorkerMap.put(sid, sw);       queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(               SEND_CAPACITY));                                         //启动发送器线程和接收器线程        sw.start();        rw.start();         return true;    }    return false;}


9)点击SendWorker,并查找该类下的run方法

QuorumCnxManager.java

 

public void run() {   thread Cnt.incrementAndGet();    try {        /**         * If there is nothing in the queue to send, then we         * send the lastMessage to ensure that the last message         * was received by the peer. The message could be dropped         * incase self or the peer shutdown their connection         * (and exit the thread) prior toreading/processing         * the last message. Duplicate messages are handled correctly         * by the peer.         *         * If the send queue is non-empty, then we have a recent         *message than that stored in lastMessage. To avoid sending         *stale message, we should send the message in the send queue.         */       Array BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);        if (bq== null || isSendQueueEmpty(bq)) {          ByteBuffer b = lastMessageSent.get(sid);           if(b != null) {              LOG.debug("Attempting to send lastMessage to sid=" + sid);              send(b);           }        }    } catch(IOException e) {        LOG.error("Failedto send last message. Shutting down thread.", e);       this.finish();    }     try {              //只要连接没有断开        while(running && !shutdown && sock != null) {            ByteBuffer b = null;            try{               ArrayBlockingQueue<ByteBuffer> bq = queueSendMap                        .get(sid);               if (bq != null) {                                   //不断从发送队列SendQueue中,获取发送消息,并执行发送                   b = pollSendQueue(bq,1000, TimeUnit.MILLISECONDS);               } else {                   LOG.error("No queue of incoming messages for " +                              "server" + sid);                   break;               }                if(b != null){                                   //更新对于sid这台服务器的最近一条消息                    lastMessageSent.put(sid, b);                                                                     //执行发送                   send(b);               }            }catch (InterruptedException e) {               LOG.warn("Interrupted while waiting for message on queue",                        e);            }        }    } catch(Exception e) {       LOG.warn("Exception when using channel: for id " + sid                + " my id = " + QuorumCnxManager.this.mySid                + " error = " + e);    }   this.finish();   LOG.warn("Send worker leaving thread " + " id " +sid + " my id = " + self.getId());} synchronized void send(ByteBuffer b) throws IOException {    byte[] msgBytes = new byte[b.capacity()];    try {       b.position(0);       b.get(msgBytes);    } catch(BufferUnderflowException be) {       LOG.error("BufferUnderflowException ", be);        return;    }   // 输出流向外发送   dout.writeInt(b.capacity());    dout.write(b.array());   dout.flush();}


10)点击RecvWorker,并查找该类下的run方法

QuorumCnxManager.java

 

public void run() {   threadCnt.incrementAndGet();    try {              //只要连接没有断开        while(running && !shutdown && sock != null) {            /**             *Reads the first int to determine the length of the             *message             */            int length = din.readInt();            if(length <= 0 || length > PACKETMAXSIZE) {               throw new IOException(                        "Received packetwith invalid packet: "                               + length);            }            /**             *Allocates a new ByteBuffer to receive the message             */           byte[] msgArray = new byte[length];            //输入流接收消息            din.readFully(msgArray, 0, length);           ByteBuffer message = ByteBuffer.wrap(msgArray);                     //接收对方发送过来的选票            addToRecvQueue(newMessage(message.duplicate(), sid));        }    } catch(Exception e) {       LOG.warn("Connection broken for id " + sid + ", my id ="                + QuorumCnxManager.this.mySid + ", error = " , e);    } finally {       LOG.warn("Interrupting SendWorker");       sw.finish();       closeSocket(sock);    }} public void addToRecvQueue(Message msg) {   synchronized(recvQLock) {        if(recvQueue.remainingCapacity() == 0) {            try{               recvQueue.remove();            }catch (NoSuchElementException ne) {               // element could be removed by poll()                LOG.debug("Trying to remove from an empty " +                    "recvQueue. Ignoring exception " + ne);            }        }        try {                     //将接收到的消息,放入接收消息队列            recvQueue.add(msg);        } catch(IllegalStateException ie) {            //This should never happen           LOG.error("Unable to insert element in the recvQueue " + ie);        }    }}


11)在FastLeaderElection.java类中查找WorkerReceiver线程。

class WorkerReceiver extends ZooKeeperThread  {    volatileboolean stop;   QuorumCnxManager manager;    WorkerReceiver(QuorumCnxManager manager) {       super("WorkerReceiver");       this.stop = false;       this.manager = manager;    }     public void run() {         Message response;        while(!stop) {            //Sleeps on receive            try{                            //从RecvQueue中取出选举投票消息(其他服务器发送过来的)               response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);               … …            }catch (InterruptedException e) {               LOG.warn("Interrupted Exception while waiting for new message"+                        e.toString());            }        }       LOG.info("WorkerReceiver is down");    }}




扫码关注我们

微信号|bigdata_story

B站|大数据那些事



浏览 40
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报