解读源码系列——源码级图文详解Zookeeper选举机制
Zookeeper选举概述
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加
当第一次启动时,Zookeeper的选举机制如下图:
(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。
当非第一次启动时,选举过程如下所示:
(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
•服务器初始化启动。
•服务器运行期间无法和Leader保持连接。
(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
•集群中本来就已经存在一个Leader。
对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
•集群中确实不存在Leader。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
SID为1、2、4的机器投票情况:
(EPOCH,ZXID,SID) | (EPOCH,ZXID,SID) | (EPOCH,ZXID,SID) |
(1,8,1) | (1,8,2) | (1,7,4) |
选举Leader规则:
①EPOCH大的直接胜出
②EPOCH相同,事务id大的胜出
③事务id相同,服务器id大的胜出
ZK选举流程分析
选举准备
选举准备源码概览:
QuorumPeer.java
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id" + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem startingAdminServer", e);
System.out.println(e);
}
// 选举准备
startLeaderElection();
super.start();
}
synchronized public void startLeaderElection() {
try {
if (getPeerState() ==ServerState.LOOKING) {
// 创建选票
// (1)选票组件:epoch(leader的任期代号)、zxid(某个leader当选期间执行的事务编号)、myid(serverid)
// (2)开始选票时,都是先投自己
currentVote = new Vote(myid,getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = newRuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in thepeer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
// 创建选举算法实例
this.electionAlg = createElectionAlgorithm(electionType);
}
protectedElection createElectionAlgorithm(intelectionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this,true);
break;
case 3:
// 1创建QuorumCnxnManager,负责选举过程中的所有网络通信
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm =qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobberingalready-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// 2启动监听线程
listener.start();
// 3准备开始选举
FastLeaderElection fle = new FastLeaderElection(this,qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener wheninitializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
1)网络通信组件初始化
public QuorumCnxManager createCnxnManager() {
return new QuorumCnxManager(this,
this.getId(),
this.getView(),
this.authServer,
this.authLearner,
this.tickTime * this.syncLimit,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}
public QuorumCnxManager(QuorumPeerself,
final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {
// 创建各种队列
this.recvQueue= new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap= new ConcurrentHashMap<Long,ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap= new ConcurrentHashMap<Long,SendWorker>();
this.lastMessageSent= new ConcurrentHashMap<Long,ByteBuffer>();
String cnxToValue =System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO =Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner,quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits forconnection requests
listener = new Listener();
listener.setName("QuorumPeerListener");
}
2)监听线程初始化
点击QuorumCnxManager.Listener,找到对应的run方法
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) &&(portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
if(self.shouldUsePortUnification()) {
LOG.info("CreatingTLS-enabled quorum server socket");
ss = newUnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
LOG.info("CreatingTLS-only quorum server socket");
ss = newUnifiedServerSocket(self.getX509Util(), false);
} else {
ss = new ServerSocket();
}
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()){
int port =self.getElectionAddress().getPort();
addr = newInetSocketAddress(port);
} else {
// Resolve hostname for thisserver in case the
// underlying ip address haschanged.
self.recreateSocketAddresses(self.getId());
addr =self.getElectionAddress();
}
LOG.info("My election bindport: " + addr.toString());
setName(addr.toString());
// 绑定服务器地址
ss.bind(addr);
// 死循环
while (!shutdown) {
try {
// 阻塞,等待处理请求
client = ss.accept();
setSockOpts(client);
LOG.info("Receivedconnection request "
+formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
// Receive and handle theconnection request
// asynchronously if thequorum sasl authentication is
// enabled. This isrequired because sasl server
// authentication processmay take few seconds to finish,
// this may delay next peer connectionrequests.
if (quorumSaslAuthEnabled){
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutExceptione) {
LOG.warn("The socketis listening for the election accepted "
+ "and ittimed out unexpectedly, but will retry."
+ "seeZOOKEEPER-2836");
}
}
} catch (IOException e) {
... ...
closeSocket(client);
}
}
... ..
.
}
3)选举准备
点击FastLeaderElection
public FastLeaderElection(QuorumPeerself, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self,QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
// 初始化队列和信息
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger= new Messenger(manager);
}
选举执行
选举执行源码概览:
QuorumPeer.java
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id" + myid + " not in the peer list");
}
// 冷启动数据恢复
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem startingAdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}
1)执行super.start();就相当于执行QuorumPeer.java类中的run()方法
当Zookeeper启动后,首先都是Looking状态,通过选举,让其中一台服务器成为Leader,其他的服务器成为Follower。
QuorumPeer.java
public void run() {
updateThreadName();
LOG.debug("Starting quorumpeer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s:getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register withJMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed toregister with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register withJMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if(Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attemptingto start ReadOnlyZooKeeperServer");
// Create read-only serverbut don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZkimmediately, wait some grace
// period before we decidewe're partitioned.
//
// Thread is used herebecause otherwise it would require
// changes in each ofelection strategy classes which is
// unnecessary codecoupling.
Thread roZkMgr = newThread() {
public void run() {
try {
// lower-boundgrace period to 2 secs
sleep(Math.max(2000, tickTime));
if(ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch(InterruptedException e) {
LOG.info("Interrupted while attempting to startReadOnlyZooKeeperServer, not started");
} catch (Exceptione) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE =false;
startLeaderElection();
}
// 进行选举,选举结束,返回最终成为Leader胜选的那张选票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is inthe the grace period, interrupt
// to come out ofwaiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE =false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpectedexception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpectedexception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpectedexception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle =Time.currentElapsedTime();
}
} finally {
... ...
}
}
2)ctrl+alt+b点击lookForLeader()的实现类FastLeaderElection.java
public Vote lookForLeader()throws InterruptedException {
try {
self.jmxLeaderElectionBean = newLeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean,self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register withJMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle =Time.currentElapsedTime();
}
try {
// 正常启动中,所有其他服务器,都会给我发送一个投票
// 保存每一个服务器的最新合法有效的投票
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
// 存储合法选举之外的投票结果
HashMap<Long, Vote> outofelection= new HashMap<Long, Vote>();
// 一次选举的最大等待时间,默认值是0.2s
int notTimeout = finalizeWait;
// 每发起一轮选举,logicalclock++
// 在没有合法的epoch数据之前,都使用逻辑时钟代替
// 选举leader的规则:依次比较 epoch(任期) zxid(事务id) serverid(myid) 谁大谁当选leader
synchronized(this){
// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
// logicalclock = epoch
logicalclock.incrementAndGet();
// 更新选票(serverid,zxid, epoch),
updateProposal(getInitId(), getInitLastLoggedZxid(),getPeerEpoch());
}
LOG.info("New election. My id= " + self.getId() +
", proposed zxid=0x"+ Long.toHexString(proposedZxid));
// 广播选票,把自己的选票发给其他服务器
sendNotifications();
/*
* Loop in which we exchangenotifications until we find a leader
*/
// 一轮一轮的选举直到选举成功
while ((self.getPeerState() ==ServerState.LOOKING) &&
(!stop)){
… …
}
return null;
} finally {
… …
}
}
3)点击sendNotifications,广播选票,把自己的选票发给其他服务器
private void sendNotifications() {
// 遍历投票参与者,给每台服务器发送选票
for (long sid :self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv =self.getQuorumVerifier();
// 创建发送选票
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch,qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("SendingNotification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" +Long.toHexString(logicalclock.get()) +
" (n.round), " +sid + " (recipient), " + self.getId() +
" (myid), 0x" +Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 把发送选票放入发送队列
sendqueue.offer(notmsg);
}
}
4)在FastLeaderElection.java类中查找WorkerSender线程。
class WorkerSender extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;
WorkerSender(QuorumCnxManager manager){
super("WorkerSender");
this.stop = false;
this.manager = manager;
}
public void run() {
while (!stop) {
try {
// 队列阻塞,时刻准备接收要发送的选票
ToSend m = sendqueue.poll(3000,TimeUnit.MILLISECONDS);
if(m == null) continue;
// 处理要发送的选票
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender isdown");
}
/**
* Called by run() once there is a newmessage to send.
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer =buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
// 发送选票
manager.toSend(m.sid, requestBuffer);
}
}
public void toSend(Long sid,ByteBuffer b) {
/*
* If sending message to myself, thensimply enqueue it (loopback).
*/
// 判断如果是发给自己的消息,直接进入自己的RecvQueue
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the correspondingthread to send.
*/
} else {
/*
* Start a new connection if doesn'thave one already.
*/
// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列
// ,并把要发送的消息放入该队列
ArrayBlockingQueue<ByteBuffer>bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer>oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
// 将选票发送出去
connectOne(sid);
}
}
5)如果数据是发送给自己的,添加到自己的接收队列
public void addToRecvQueue(Messagemsg) {
synchronized(recvQLock) {
if (recvQueue.remainingCapacity() == 0){
try {
recvQueue.remove();
} catch (NoSuchElementException ne){
// element could be removed bypoll()
LOG.debug("Trying toremove from an empty " +
"recvQueue. Ignoringexception " + ne);
}
}
try {
// 将发送给自己的选票添加到recvQueue队列
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert elementin the recvQueue " + ie);
}
}
}
6)数据添加到发送队列
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer>queue,
ByteBuffer buffer) {
if(queue.remainingCapacity() == 0) {
try {
queue.remove();
} catch(NoSuchElementException ne) {
//element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"Queue. Ignoring exception " + ne);
}
}
try {
//将要发送的消息添加到发送队列
queue.add(buffer);
} catch(IllegalStateException ie) {
// Thisshould never happen
LOG.error("Unable to insert an element in the queue " + ie);
}
}
7)与要发送的服务器节点建立通信连接
synchronized void connectOne(long sid){
if(senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
//Resolve hostname for the remote server before attempting to
//connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView =self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView =lastSeenQV.getAllMembers();
if(lastCommittedView.containsKey(sid)) {
knownId = true;
if(connectOne(sid,lastCommittedView.get(sid).electionAddr))
return;
}
if(lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if(connectOne(sid,lastProposedView.get(sid).electionAddr))
return;
}
if(!knownId) {
LOG.warn("Invalid server id: " + sid);
return;
}
}
}
synchronized private boolean connectOne(long sid, InetSocketAddresselectionAddr){
if(senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
returntrue;
}
Socket sock= null;
try {
LOG.debug("Opening channel to server " + sid);
if(self.isSslQuorum()) {
SSLSocket sslSock = self.getX509Util().createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
LOG.info("SSL handshake complete with {} - {} - {}",sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(),sslSock.getSession().getCipherSuite());
} else{
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);
}
LOG.debug("Connected to server " + sid);
//Sends connection request asynchronously if the quorum
// saslauthentication is enabled. This is required because
// saslserver authentication process may take few seconds to
//finish, this may delay next peer connection requests.
if(quorumSaslAuthEnabled) {
initiateConnectionAsync(sock, sid);
} else{
//处理连接
initiateConnection(sock,sid);
}
returntrue;
} catch(UnresolvedAddressException e) {
... ...
}
}
public void initiateConnection(final Socket sock, final Long sid) {
try {
startConnection(sock,sid);
} catch(IOException e) {
LOG.error("Exception while connecting, id: {}, addr: {}, closinglearner connection",
new Object[] { sid, sock.getRemoteSocketAddress() }, e);
closeSocket(sock);
return;
}
}
8)创建并启动发送器线程和接收器线程
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// UseBufferedOutputStream to reduce the number of IP packets. This is
//important for x-DC scenarios.
//通过输出流,向服务器发送数据
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = newDataOutputStream(buf);
//Sending id and challenge
//represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = formatInetAddr(self.getElectionAddress());
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
//通过输入流读取对方发送过来的选票
din = new DataInputStream(
newBufferedInputStream(sock.getInputStream()));
} catch(IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ",e);
closeSocket(sock);
returnfalse;
}
// authenticatelearner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps !=null) {
// TODO- investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lostthe challenge, then drop the new connection
//如果对方的id比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() +")");
closeSocket(sock);
//Otherwise proceed with the connection
} else {
//初始化,发送器和接收器
SendWorker sw = new SendWorker(sock,sid);
RecvWorker rw = new RecvWorker(sock,din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw!= null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
//启动发送器线程和接收器线程
sw.start();
rw.start();
return true;
}
return false;
}
9)点击SendWorker,并查找该类下的run方法
QuorumCnxManager.java
public void run() {
thread Cnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* incase self or the peer shutdown their connection
* (and exit the thread) prior toreading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
*message than that stored in lastMessage. To avoid sending
*stale message, we should send the message in the send queue.
*/
Array BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq== null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if(b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch(IOException e) {
LOG.error("Failedto send last message. Shutting down thread.", e);
this.finish();
}
try {
//只要连接没有断开
while(running && !shutdown && sock != null) {
ByteBuffer b = null;
try{
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
//不断从发送队列SendQueue中,获取发送消息,并执行发送
b = pollSendQueue(bq,1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server" + sid);
break;
}
if(b != null){
//更新对于sid这台服务器的最近一条消息
lastMessageSent.put(sid, b);
//执行发送
send(b);
}
}catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch(Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread " + " id " +sid + " my id = " + self.getId());
}
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch(BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
// 输出流向外发送
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
10)点击RecvWorker,并查找该类下的run方法
QuorumCnxManager.java
public void run() {
threadCnt.incrementAndGet();
try {
//只要连接没有断开
while(running && !shutdown && sock != null) {
/**
*Reads the first int to determine the length of the
*message
*/
int length = din.readInt();
if(length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packetwith invalid packet: "
+ length);
}
/**
*Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
//输入流接收消息
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
//接收对方发送过来的选票
addToRecvQueue(newMessage(message.duplicate(), sid));
}
} catch(Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id ="
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
closeSocket(sock);
}
}
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
if(recvQueue.remainingCapacity() == 0) {
try{
recvQueue.remove();
}catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"recvQueue. Ignoring exception " + ne);
}
}
try {
//将接收到的消息,放入接收消息队列
recvQueue.add(msg);
} catch(IllegalStateException ie) {
//This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
11)在FastLeaderElection.java类中查找WorkerReceiver线程。
class WorkerReceiver extends ZooKeeperThread {
volatileboolean stop;
QuorumCnxManager manager;
WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}
public void run() {
Message response;
while(!stop) {
//Sleeps on receive
try{
//从RecvQueue中取出选举投票消息(其他服务器发送过来的)
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
… …
}catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message"+
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}
扫码关注我们
微信号|bigdata_story
B站|大数据那些事