Zookeeper服務器在啟動的時候會通過一定的選舉算法從多個server中選出leader server,剩下的server則作為follower.目前實現的選舉算法有FastLeaderElection、AuthFastLeaderElection和 LeaderElection算法,但是AuthFastLeaderElection和LeaderElection都被標註為@Deprecated,因此真正使用的算法只有FastLeaderElection算法。所有的選舉算法實現類都實現了接口Election:
[java] view plain copy
publicinterface Election {
public Vote lookForLeader() throws InterruptedException;
publicvoid shutdown();
}
這個接口有兩個方法,lookForLeader()是具體的選舉算法的實現,而shutdown()是在選舉結束後的清理工作,包括關閉server之間為了進行選舉而建立的連接,停止為了選舉而建立的消息發送、接收的線程。
基本的選舉邏輯,大家可以參考這篇博客:zookeeper的領導者選舉和原子廣播,建議讀者先看懂這篇博客,再來看本文的源碼解析,宏觀到微觀哈。
系統初始化時,每一個QuorumPeer對象(一個QuorumPeer可以理解為一個準備參加選舉的ZK的server,即配置文件zoo.cfg中配置的服務器)維護了一個FastLeaderElection對象來為自己的選舉工作進行代言。當然,一臺服務器可以運行一個或者多個QuorumPeer。
在選舉過程中需要進行消息的溝通,因此在FastLeaderElection中維護了兩個變量:
[java] view plain copy
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
recvqueue中存放了選舉過程中接收到的消息,這些消息被交給了FastLeaderElection的最核心方法lookForLeader()進行處理以選舉出leader。而sendqueue中存放了待發送出去的消息,待發送的消息會被接下來要介紹的WorkerSender處理。
同時,每一個FastLeaderElection變量維護了一個內置類Messager,Messager類包含了兩個實現了Runnable接口的類WorkerReceiver和WorkerSender,從名字可以看出,這兩個類分別負責消息的發送和接收。即WorkerReceiver負責接收消息並將接收到的消息放入recvqueue中等待處理,WorkerSender負責從sendqueue中取出待發送消息,交給下層的連接管理類QuorumCnxManager進行發送。
每一個QuorumPeer都有一個QuorumCnxManager對象負責選舉期間QuorumPeer之間連接的建立和發送、接收消息隊列的維護。
[java] view plain copy
/*
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
/*
* Reception queue
*/
publicfinal ArrayBlockingQueue<Message> recvQueue;
可以看到,QuorumCnxManager也含有發送隊列queueSendMap、接收隊列recvQueue,同時,還有分別負責消息的發送和接收的SenderWorker和RecvWorker兩個繼承了Runnable接口的線程類,這兩個線程類的構造方法如下:
[java] view plain copy
SendWorker(Socket sock, Long sid) {
super("SendWorker:" + sid);
this.sid = sid;
this.sock = sock;
recvWorker = null;
try {
dout = new DataOutputStream(sock.getOutputStream());
} catch (IOException e) {
LOG.error("Unable to access socket output stream", e);
closeSocket(sock);
running = false;
}
LOG.debug("Address of remote peer: " + this.sid);
}
[java] view plain copy
RecvWorker(Socket sock, Long sid, SendWorker sw) {
super("RecvWorker:" + sid);
this.sid = sid;
this.sock = sock;
this.sw = sw;
try {
din = new DataInputStream(sock.getInputStream());
// OK to wait until socket disconnects while reading.
sock.setSoTimeout(0);
} catch (IOException e) {
LOG.error("Error while accessing socket for " + sid, e);
closeSocket(sock);
running = false;
}
}
每個SenderWorker或者RecvWorker都有一個sid變量,顯然,每一個sid對應的QuorumPeer都會有與之對應的SenderWorker和RecvWorker來專門負責處理接收到的它的消息或者向它發送消息。
[java] view plain copy
queueSendMap的key是sid,value是需要發送給這個sid的所有消息。
再看看這兩個worker最核心的run()方法都在做什麼:
SenderWorker.run():
[java] view plain copy
@Override
publicvoid run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}
RecvWorker.run():
[java] view plain copy
@Override
publicvoid run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
thrownew IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = newbyte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}
從代碼可以看到,SenderWorker負責不斷從全局的queueSendMap中讀取自己所負責的sid對應的消息的列表,然後將消息發送給對應的sid。
而RecvWorker負責從與自己負責的sid建立的TCP連接中讀取數據放入到recvQueue的末尾。
從QuorumCnxManager.SenderWorker和QuorumCnxManager.RecvWorker的run方法中可以看出,這兩個worker都是基於QuorumCnxManager建立的連接,與對應的server進行消息的發送和接收,而要發送的消息則來自FastLeaderElection,接收到的消息,也是被FastLeaderElection處理,因此,QuorumCnxManager的兩個worker並不負責具體的算法實現,只是消息發送、接收的代理類,FastLeaderElection不需要理睬怎麼與其它的server通信、怎麼獲得其它server的投票信息這些細節,只需要從QuorumCnxManager提供的隊列裡面取消息或者放入消息。