解读源码系列——源码级图文详解Zookeeper选举机制

Zookeeper选举概述
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加
当第一次启动时,Zookeeper的选举机制如下图:

(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。
当非第一次启动时,选举过程如下所示:

(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
•服务器初始化启动。
•服务器运行期间无法和Leader保持连接。
(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
•集群中本来就已经存在一个Leader。
对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
•集群中确实不存在Leader。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
SID为1、2、4的机器投票情况:
(EPOCH,ZXID,SID) | (EPOCH,ZXID,SID) | (EPOCH,ZXID,SID) |
| (1,8,1) | (1,8,2) | (1,7,4) |
选举Leader规则:
①EPOCH大的直接胜出
②EPOCH相同,事务id大的胜出
③事务id相同,服务器id大的胜出
ZK选举流程分析

选举准备
选举准备源码概览:

QuorumPeer.java
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id" + myid + " not in the peer list");}loadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem startingAdminServer", e);System.out.println(e);}// 选举准备startLeaderElection();super.start();}synchronized public void startLeaderElection() {try {if (getPeerState() ==ServerState.LOOKING) {// 创建选票// (1)选票组件:epoch(leader的任期代号)、zxid(某个leader当选期间执行的事务编号)、myid(serverid)// (2)开始选票时,都是先投自己currentVote = new Vote(myid,getLastLoggedZxid(), getCurrentEpoch());}} catch(IOException e) {RuntimeException re = newRuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}// if (!getView().containsKey(myid)) {// throw new RuntimeException("My id " + myid + " not in thepeer list");//}if (electionType == 0) {try {udpSocket = new DatagramSocket(getQuorumAddress().getPort());responder = new ResponderThread();responder.start();} catch (SocketException e) {throw new RuntimeException(e);}}// 创建选举算法实例this.electionAlg = createElectionAlgorithm(electionType);}protectedElection createElectionAlgorithm(intelectionAlgorithm){Election le=null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 0:le = new LeaderElection(this);break;case 1:le = new AuthFastLeaderElection(this);break;case 2:le = new AuthFastLeaderElection(this,true);break;case 3:// 1创建QuorumCnxnManager,负责选举过程中的所有网络通信QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm =qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobberingalready-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){// 2启动监听线程listener.start();// 3准备开始选举FastLeaderElection fle = new FastLeaderElection(this,qcm);fle.start();le = fle;} else {LOG.error("Null listener wheninitializing cnx manager");}break;default:assert false;}return le;}
1)网络通信组件初始化
public QuorumCnxManager createCnxnManager() {return new QuorumCnxManager(this,this.getId(),this.getView(),this.authServer,this.authLearner,this.tickTime * this.syncLimit,this.getQuorumListenOnAllIPs(),this.quorumCnxnThreadsSize,this.isQuorumSaslAuthEnabled());}public QuorumCnxManager(QuorumPeerself,final long mySid,Map<Long,QuorumPeer.QuorumServer> view,QuorumAuthServer authServer,QuorumAuthLearner authLearner,int socketTimeout,boolean listenOnAllIPs,int quorumCnxnThreadsSize,boolean quorumSaslAuthEnabled) {// 创建各种队列this.recvQueue= new ArrayBlockingQueue<Message>(RECV_CAPACITY);this.queueSendMap= new ConcurrentHashMap<Long,ArrayBlockingQueue<ByteBuffer>>();this.senderWorkerMap= new ConcurrentHashMap<Long,SendWorker>();this.lastMessageSent= new ConcurrentHashMap<Long,ByteBuffer>();String cnxToValue =System.getProperty("zookeeper.cnxTimeout");if(cnxToValue != null){this.cnxTO =Integer.parseInt(cnxToValue);}this.self = self;this.mySid = mySid;this.socketTimeout = socketTimeout;this.view = view;this.listenOnAllIPs = listenOnAllIPs;initializeAuth(mySid, authServer, authLearner,quorumCnxnThreadsSize,quorumSaslAuthEnabled);// Starts listener thread that waits forconnection requestslistener = new Listener();listener.setName("QuorumPeerListener");}
2)监听线程初始化
点击QuorumCnxManager.Listener,找到对应的run方法
public void run() {int numRetries = 0;InetSocketAddress addr;Socket client = null;Exception exitException = null;while ((!shutdown) &&(portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {try {if(self.shouldUsePortUnification()) {LOG.info("CreatingTLS-enabled quorum server socket");ss = newUnifiedServerSocket(self.getX509Util(), true);} else if (self.isSslQuorum()) {LOG.info("CreatingTLS-only quorum server socket");ss = newUnifiedServerSocket(self.getX509Util(), false);} else {ss = new ServerSocket();}ss.setReuseAddress(true);if (self.getQuorumListenOnAllIPs()){int port =self.getElectionAddress().getPort();addr = newInetSocketAddress(port);} else {// Resolve hostname for thisserver in case the// underlying ip address haschanged.self.recreateSocketAddresses(self.getId());addr =self.getElectionAddress();}LOG.info("My election bindport: " + addr.toString());setName(addr.toString());// 绑定服务器地址ss.bind(addr);// 死循环while (!shutdown) {try {// 阻塞,等待处理请求client = ss.accept();setSockOpts(client);LOG.info("Receivedconnection request "+formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));// Receive and handle theconnection request// asynchronously if thequorum sasl authentication is// enabled. This isrequired because sasl server// authentication processmay take few seconds to finish,// this may delay next peer connectionrequests.if (quorumSaslAuthEnabled){receiveConnectionAsync(client);} else {receiveConnection(client);}numRetries = 0;} catch (SocketTimeoutExceptione) {LOG.warn("The socketis listening for the election accepted "+ "and ittimed out unexpectedly, but will retry."+ "seeZOOKEEPER-2836");}}} catch (IOException e) {... ...closeSocket(client);}}... ..
.
}
3)选举准备
点击FastLeaderElection
public FastLeaderElection(QuorumPeerself, QuorumCnxManager manager){this.stop = false;this.manager = manager;starter(self, manager);}private void starter(QuorumPeer self,QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;// 初始化队列和信息sendqueue = new LinkedBlockingQueue<ToSend>();recvqueue = new LinkedBlockingQueue<Notification>();this.messenger= new Messenger(manager);}
选举执行
选举执行源码概览:

QuorumPeer.java
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id" + myid + " not in the peer list");}// 冷启动数据恢复loadDataBase();startServerCnxnFactory();try {// 启动通信工厂实例对象adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem startingAdminServer", e);System.out.println(e);}// 准备选举环境startLeaderElection();// 执行选举super.start();}
1)执行super.start();就相当于执行QuorumPeer.java类中的run()方法
当Zookeeper启动后,首先都是Looking状态,通过选举,让其中一台服务器成为Leader,其他的服务器成为Follower。
QuorumPeer.java
public void run() {updateThreadName();LOG.debug("Starting quorumpeer");try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for(QuorumServer s:getView().values()){ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register withJMX", e);jmxLocalPeerBean = null;}} else {RemotePeerBean rBean = new RemotePeerBean(this, s);try {MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);jmxRemotePeerBean.put(s.id, rBean);} catch (Exception e) {LOG.warn("Failed toregister with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register withJMX", e);jmxQuorumBean = null;}try {/** Main loop*/while (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");if(Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attemptingto start ReadOnlyZooKeeperServer");// Create read-only serverbut don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk =new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZkimmediately, wait some grace// period before we decidewe're partitioned.//// Thread is used herebecause otherwise it would require// changes in each ofelection strategy classes which is// unnecessary codecoupling.Thread roZkMgr = newThread() {public void run() {try {// lower-boundgrace period to 2 secssleep(Math.max(2000, tickTime));if(ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch(InterruptedException e) {LOG.info("Interrupted while attempting to startReadOnlyZooKeeperServer, not started");} catch (Exceptione) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE =false;startLeaderElection();}// 进行选举,选举结束,返回最终成为Leader胜选的那张选票setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is inthe the grace period, interrupt// to come out ofwaiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE =false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpectedexception",e );} finally {observer.shutdown();setObserver(null);updateServerState();}break;case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpectedexception",e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpectedexception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}start_fle =Time.currentElapsedTime();}} finally {... ...}}
2)ctrl+alt+b点击lookForLeader()的实现类FastLeaderElection.java
public Vote lookForLeader()throws InterruptedException {try {self.jmxLeaderElectionBean = newLeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean,self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register withJMX", e);self.jmxLeaderElectionBean = null;}if (self.start_fle == 0) {self.start_fle =Time.currentElapsedTime();}try {// 正常启动中,所有其他服务器,都会给我发送一个投票// 保存每一个服务器的最新合法有效的投票HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();// 存储合法选举之外的投票结果HashMap<Long, Vote> outofelection= new HashMap<Long, Vote>();// 一次选举的最大等待时间,默认值是0.2sint notTimeout = finalizeWait;// 每发起一轮选举,logicalclock++// 在没有合法的epoch数据之前,都使用逻辑时钟代替// 选举leader的规则:依次比较 epoch(任期) zxid(事务id) serverid(myid) 谁大谁当选leadersynchronized(this){// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟// logicalclock = epochlogicalclock.incrementAndGet();// 更新选票(serverid,zxid, epoch),updateProposal(getInitId(), getInitLastLoggedZxid(),getPeerEpoch());}LOG.info("New election. My id= " + self.getId() +", proposed zxid=0x"+ Long.toHexString(proposedZxid));// 广播选票,把自己的选票发给其他服务器sendNotifications();/** Loop in which we exchangenotifications until we find a leader*/// 一轮一轮的选举直到选举成功while ((self.getPeerState() ==ServerState.LOOKING) &&(!stop)){… …}return null;} finally {… …}}
3)点击sendNotifications,广播选票,把自己的选票发给其他服务器
private void sendNotifications() {// 遍历投票参与者,给每台服务器发送选票for (long sid :self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv =self.getQuorumVerifier();// 创建发送选票ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());if(LOG.isDebugEnabled()){LOG.debug("SendingNotification: " + proposedLeader + " (n.leader), 0x" +Long.toHexString(proposedZxid) + " (n.zxid), 0x" +Long.toHexString(logicalclock.get()) +" (n.round), " +sid + " (recipient), " + self.getId() +" (myid), 0x" +Long.toHexString(proposedEpoch) + " (n.peerEpoch)");}// 把发送选票放入发送队列sendqueue.offer(notmsg);}}
4)在FastLeaderElection.java类中查找WorkerSender线程。
class WorkerSender extends ZooKeeperThread {volatile boolean stop;QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager){super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {// 队列阻塞,时刻准备接收要发送的选票ToSend m = sendqueue.poll(3000,TimeUnit.MILLISECONDS);if(m == null) continue;// 处理要发送的选票process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender isdown");}/*** Called by run() once there is a newmessage to send.* @param m message to send*/void process(ToSend m) {ByteBuffer requestBuffer =buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.peerEpoch,m.configData);// 发送选票manager.toSend(m.sid, requestBuffer);}}public void toSend(Long sid,ByteBuffer b) {/** If sending message to myself, thensimply enqueue it (loopback).*/// 判断如果是发给自己的消息,直接进入自己的RecvQueueif (this.mySid == sid) {b.position(0);addToRecvQueue(new Message(b.duplicate(), sid));/** Otherwise send to the correspondingthread to send.*/} else {/** Start a new connection if doesn'thave one already.*/// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列// ,并把要发送的消息放入该队列ArrayBlockingQueue<ByteBuffer>bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);ArrayBlockingQueue<ByteBuffer>oldq = queueSendMap.putIfAbsent(sid, bq);if (oldq != null) {addToSendQueue(oldq, b);} else {addToSendQueue(bq, b);}// 将选票发送出去connectOne(sid);}}
5)如果数据是发送给自己的,添加到自己的接收队列
public void addToRecvQueue(Messagemsg) {synchronized(recvQLock) {if (recvQueue.remainingCapacity() == 0){try {recvQueue.remove();} catch (NoSuchElementException ne){// element could be removed bypoll()LOG.debug("Trying toremove from an empty " +"recvQueue. Ignoringexception " + ne);}}try {// 将发送给自己的选票添加到recvQueue队列recvQueue.add(msg);} catch (IllegalStateException ie) {// This should never happenLOG.error("Unable to insert elementin the recvQueue " + ie);}}}
6)数据添加到发送队列
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer>queue,ByteBuffer buffer) {if(queue.remainingCapacity() == 0) {try {queue.remove();} catch(NoSuchElementException ne) {//element could be removed by poll()LOG.debug("Trying to remove from an empty " +"Queue. Ignoring exception " + ne);}}try {//将要发送的消息添加到发送队列queue.add(buffer);} catch(IllegalStateException ie) {// Thisshould never happenLOG.error("Unable to insert an element in the queue " + ie);}}
7)与要发送的服务器节点建立通信连接
synchronized void connectOne(long sid){if(senderWorkerMap.get(sid) != null) {LOG.debug("There is a connection already for server " + sid);return;}synchronized (self.QV_LOCK) {boolean knownId = false;//Resolve hostname for the remote server before attempting to//connect in case the underlying ip address has changed.self.recreateSocketAddresses(sid);Map<Long, QuorumPeer.QuorumServer> lastCommittedView =self.getView();QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();Map<Long, QuorumPeer.QuorumServer> lastProposedView =lastSeenQV.getAllMembers();if(lastCommittedView.containsKey(sid)) {knownId = true;if(connectOne(sid,lastCommittedView.get(sid).electionAddr))return;}if(lastSeenQV != null && lastProposedView.containsKey(sid)&& (!knownId || (lastProposedView.get(sid).electionAddr !=lastCommittedView.get(sid).electionAddr))) {knownId = true;if(connectOne(sid,lastProposedView.get(sid).electionAddr))return;}if(!knownId) {LOG.warn("Invalid server id: " + sid);return;}}}synchronized private boolean connectOne(long sid, InetSocketAddresselectionAddr){if(senderWorkerMap.get(sid) != null) {LOG.debug("There is a connection already for server " + sid);returntrue;}Socket sock= null;try {LOG.debug("Opening channel to server " + sid);if(self.isSslQuorum()) {SSLSocket sslSock = self.getX509Util().createSSLSocket();setSockOpts(sslSock);sslSock.connect(electionAddr, cnxTO);sslSock.startHandshake();sock = sslSock;LOG.info("SSL handshake complete with {} - {} - {}",sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(),sslSock.getSession().getCipherSuite());} else{sock = new Socket();setSockOpts(sock);sock.connect(electionAddr, cnxTO);}LOG.debug("Connected to server " + sid);//Sends connection request asynchronously if the quorum// saslauthentication is enabled. This is required because// saslserver authentication process may take few seconds to//finish, this may delay next peer connection requests.if(quorumSaslAuthEnabled) {initiateConnectionAsync(sock, sid);} else{//处理连接initiateConnection(sock,sid);}returntrue;} catch(UnresolvedAddressException e) {... ...}}public void initiateConnection(final Socket sock, final Long sid) {try {startConnection(sock,sid);} catch(IOException e) {LOG.error("Exception while connecting, id: {}, addr: {}, closinglearner connection",new Object[] { sid, sock.getRemoteSocketAddress() }, e);closeSocket(sock);return;}}
8)创建并启动发送器线程和接收器线程
private boolean startConnection(Socket sock, Long sid)throws IOException {DataOutputStream dout = null;DataInputStream din = null;try {// UseBufferedOutputStream to reduce the number of IP packets. This is//important for x-DC scenarios.//通过输出流,向服务器发送数据BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());dout = newDataOutputStream(buf);//Sending id and challenge//represents protocol version (in other words - message type)dout.writeLong(PROTOCOL_VERSION);dout.writeLong(self.getId());String addr = formatInetAddr(self.getElectionAddress());byte[] addr_bytes = addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();//通过输入流读取对方发送过来的选票din = new DataInputStream(newBufferedInputStream(sock.getInputStream()));} catch(IOException e) {LOG.warn("Ignoring exception reading or writing challenge: ",e);closeSocket(sock);returnfalse;}// authenticatelearnerQuorumPeer.QuorumServer qps = self.getVotingView().get(sid);if (qps !=null) {// TODO- investigate why reconfig makes qps null.authLearner.authenticate(sock, qps.hostname);}// If lostthe challenge, then drop the new connection//如果对方的id比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端if (sid > self.getId()) {LOG.info("Have smaller server identifier, so dropping the " +"connection: (" + sid + ", " + self.getId() +")");closeSocket(sock);//Otherwise proceed with the connection} else {//初始化,发送器和接收器SendWorker sw = new SendWorker(sock,sid);RecvWorker rw = new RecvWorker(sock,din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);if(vsw!= null)vsw.finish();senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));//启动发送器线程和接收器线程sw.start();rw.start();return true;}return false;}
9)点击SendWorker,并查找该类下的run方法
QuorumCnxManager.java
public void run() {thread Cnt.incrementAndGet();try {/*** If there is nothing in the queue to send, then we* send the lastMessage to ensure that the last message* was received by the peer. The message could be dropped* incase self or the peer shutdown their connection* (and exit the thread) prior toreading/processing* the last message. Duplicate messages are handled correctly* by the peer.** If the send queue is non-empty, then we have a recent*message than that stored in lastMessage. To avoid sending*stale message, we should send the message in the send queue.*/Array BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq== null || isSendQueueEmpty(bq)) {ByteBuffer b = lastMessageSent.get(sid);if(b != null) {LOG.debug("Attempting to send lastMessage to sid=" + sid);send(b);}}} catch(IOException e) {LOG.error("Failedto send last message. Shutting down thread.", e);this.finish();}try {//只要连接没有断开while(running && !shutdown && sock != null) {ByteBuffer b = null;try{ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq != null) {//不断从发送队列SendQueue中,获取发送消息,并执行发送b = pollSendQueue(bq,1000, TimeUnit.MILLISECONDS);} else {LOG.error("No queue of incoming messages for " +"server" + sid);break;}if(b != null){//更新对于sid这台服务器的最近一条消息lastMessageSent.put(sid, b);//执行发送send(b);}}catch (InterruptedException e) {LOG.warn("Interrupted while waiting for message on queue",e);}}} catch(Exception e) {LOG.warn("Exception when using channel: for id " + sid+ " my id = " + QuorumCnxManager.this.mySid+ " error = " + e);}this.finish();LOG.warn("Send worker leaving thread " + " id " +sid + " my id = " + self.getId());}synchronized void send(ByteBuffer b) throws IOException {byte[] msgBytes = new byte[b.capacity()];try {b.position(0);b.get(msgBytes);} catch(BufferUnderflowException be) {LOG.error("BufferUnderflowException ", be);return;}// 输出流向外发送dout.writeInt(b.capacity());dout.write(b.array());dout.flush();}
10)点击RecvWorker,并查找该类下的run方法
QuorumCnxManager.java
public void run() {threadCnt.incrementAndGet();try {//只要连接没有断开while(running && !shutdown && sock != null) {/***Reads the first int to determine the length of the*message*/int length = din.readInt();if(length <= 0 || length > PACKETMAXSIZE) {throw new IOException("Received packetwith invalid packet: "+ length);}/***Allocates a new ByteBuffer to receive the message*/byte[] msgArray = new byte[length];//输入流接收消息din.readFully(msgArray, 0, length);ByteBuffer message = ByteBuffer.wrap(msgArray);//接收对方发送过来的选票addToRecvQueue(newMessage(message.duplicate(), sid));}} catch(Exception e) {LOG.warn("Connection broken for id " + sid + ", my id ="+ QuorumCnxManager.this.mySid + ", error = " , e);} finally {LOG.warn("Interrupting SendWorker");sw.finish();closeSocket(sock);}}public void addToRecvQueue(Message msg) {synchronized(recvQLock) {if(recvQueue.remainingCapacity() == 0) {try{recvQueue.remove();}catch (NoSuchElementException ne) {// element could be removed by poll()LOG.debug("Trying to remove from an empty " +"recvQueue. Ignoring exception " + ne);}}try {//将接收到的消息,放入接收消息队列recvQueue.add(msg);} catch(IllegalStateException ie) {//This should never happenLOG.error("Unable to insert element in the recvQueue " + ie);}}}
11)在FastLeaderElection.java类中查找WorkerReceiver线程。
class WorkerReceiver extends ZooKeeperThread {volatileboolean stop;QuorumCnxManager manager;WorkerReceiver(QuorumCnxManager manager) {super("WorkerReceiver");this.stop = false;this.manager = manager;}public void run() {Message response;while(!stop) {//Sleeps on receivetry{//从RecvQueue中取出选举投票消息(其他服务器发送过来的)response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);… …}catch (InterruptedException e) {LOG.warn("Interrupted Exception while waiting for new message"+e.toString());}}LOG.info("WorkerReceiver is down");}}


扫码关注我们
微信号|bigdata_story
B站|大数据那些事
