ZooKeeper的FLE算法源碼解析

編程語言 Java 盤點 技術 蘭鍋鍋誒 2017-03-27

Zookeeper服務器在啟動的時候會通過一定的選舉算法從多個server中選出leader server,剩下的server則作為follower.目前實現的選舉算法有FastLeaderElection、AuthFastLeaderElection和 LeaderElection算法,但是AuthFastLeaderElection和LeaderElection都被標註為@Deprecated,因此真正使用的算法只有FastLeaderElection算法。所有的選舉算法實現類都實現了接口Election:

[java] view plain copy

  1. publicinterface Election {

  2. public Vote lookForLeader() throws InterruptedException;

  3. publicvoid shutdown();

  4. }

這個接口有兩個方法,lookForLeader()是具體的選舉算法的實現,而shutdown()是在選舉結束後的清理工作,包括關閉server之間為了進行選舉而建立的連接,停止為了選舉而建立的消息發送、接收的線程。

基本的選舉邏輯,大家可以參考這篇博客:zookeeper的領導者選舉和原子廣播,建議讀者先看懂這篇博客,再來看本文的源碼解析,宏觀到微觀哈。

系統初始化時,每一個QuorumPeer對象(一個QuorumPeer可以理解為一個準備參加選舉的ZK的server,即配置文件zoo.cfg中配置的服務器)維護了一個FastLeaderElection對象來為自己的選舉工作進行代言。當然,一臺服務器可以運行一個或者多個QuorumPeer。

在選舉過程中需要進行消息的溝通,因此在FastLeaderElection中維護了兩個變量:

[java] view plain copy

  1. LinkedBlockingQueue<ToSend> sendqueue;

  2. LinkedBlockingQueue<Notification> recvqueue;

recvqueue中存放了選舉過程中接收到的消息,這些消息被交給了FastLeaderElection的最核心方法lookForLeader()進行處理以選舉出leader。而sendqueue中存放了待發送出去的消息,待發送的消息會被接下來要介紹的WorkerSender處理。

同時,每一個FastLeaderElection變量維護了一個內置類Messager,Messager類包含了兩個實現了Runnable接口的類WorkerReceiver和WorkerSender,從名字可以看出,這兩個類分別負責消息的發送和接收。即WorkerReceiver負責接收消息並將接收到的消息放入recvqueue中等待處理,WorkerSender負責從sendqueue中取出待發送消息,交給下層的連接管理類QuorumCnxManager進行發送。

每一個QuorumPeer都有一個QuorumCnxManager對象負責選舉期間QuorumPeer之間連接的建立和發送、接收消息隊列的維護。

[java] view plain copy

  1. /*

  2. * Mapping from Peer to Thread number

  3. */

  4. final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;

  5. final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

  6. final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

  7. /*

  8. * Reception queue

  9. */

  10. publicfinal ArrayBlockingQueue<Message> recvQueue;

可以看到,QuorumCnxManager也含有發送隊列queueSendMap、接收隊列recvQueue,同時,還有分別負責消息的發送和接收的SenderWorker和RecvWorker兩個繼承了Runnable接口的線程類,這兩個線程類的構造方法如下:

[java] view plain copy

  1. SendWorker(Socket sock, Long sid) {

  2. super("SendWorker:" + sid);

  3. this.sid = sid;

  4. this.sock = sock;

  5. recvWorker = null;

  6. try {

  7. dout = new DataOutputStream(sock.getOutputStream());

  8. } catch (IOException e) {

  9. LOG.error("Unable to access socket output stream", e);

  10. closeSocket(sock);

  11. running = false;

  12. }

  13. LOG.debug("Address of remote peer: " + this.sid);

  14. }

[java] view plain copy

  1. RecvWorker(Socket sock, Long sid, SendWorker sw) {

  2. super("RecvWorker:" + sid);

  3. this.sid = sid;

  4. this.sock = sock;

  5. this.sw = sw;

  6. try {

  7. din = new DataInputStream(sock.getInputStream());

  8. // OK to wait until socket disconnects while reading.

  9. sock.setSoTimeout(0);

  10. } catch (IOException e) {

  11. LOG.error("Error while accessing socket for " + sid, e);

  12. closeSocket(sock);

  13. running = false;

  14. }

  15. }

每個SenderWorker或者RecvWorker都有一個sid變量,顯然,每一個sid對應的QuorumPeer都會有與之對應的SenderWorker和RecvWorker來專門負責處理接收到的它的消息或者向它發送消息。

[java] view plain copy

  1. queueSendMap的key是sid,value是需要發送給這個sid的所有消息。

再看看這兩個worker最核心的run()方法都在做什麼:

SenderWorker.run():

[java] view plain copy

  1. @Override

  2. publicvoid run() {

  3. threadCnt.incrementAndGet();

  4. try {

  5. /**

  6. * If there is nothing in the queue to send, then we

  7. * send the lastMessage to ensure that the last message

  8. * was received by the peer. The message could be dropped

  9. * in case self or the peer shutdown their connection

  10. * (and exit the thread) prior to reading/processing

  11. * the last message. Duplicate messages are handled correctly

  12. * by the peer.

  13. *

  14. * If the send queue is non-empty, then we have a recent

  15. * message than that stored in lastMessage. To avoid sending

  16. * stale message, we should send the message in the send queue.

  17. */

  18. ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);

  19. if (bq == null || isSendQueueEmpty(bq)) {

  20. ByteBuffer b = lastMessageSent.get(sid);

  21. if (b != null) {

  22. LOG.debug("Attempting to send lastMessage to sid=" + sid);

  23. send(b);

  24. }

  25. }

  26. } catch (IOException e) {

  27. LOG.error("Failed to send last message. Shutting down thread.", e);

  28. this.finish();

  29. }

  30. try {

  31. while (running && !shutdown && sock != null) {

  32. ByteBuffer b = null;

  33. try {

  34. ArrayBlockingQueue<ByteBuffer> bq = queueSendMap

  35. .get(sid);

  36. if (bq != null) {

  37. b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);

  38. } else {

  39. LOG.error("No queue of incoming messages for " +

  40. "server " + sid);

  41. break;

  42. }

  43. if(b != null){

  44. lastMessageSent.put(sid, b);

  45. send(b);

  46. }

  47. } catch (InterruptedException e) {

  48. LOG.warn("Interrupted while waiting for message on queue",

  49. e);

  50. }

  51. }

  52. } catch (Exception e) {

  53. LOG.warn("Exception when using channel: for id " + sid + " my id = " +

  54. self.getId() + " error = " + e);

  55. }

  56. this.finish();

  57. LOG.warn("Send worker leaving thread");

  58. }

RecvWorker.run():

[java] view plain copy

  1. @Override

  2. publicvoid run() {

  3. threadCnt.incrementAndGet();

  4. try {

  5. while (running && !shutdown && sock != null) {

  6. /**

  7. * Reads the first int to determine the length of the

  8. * message

  9. */

  10. int length = din.readInt();

  11. if (length <= 0 || length > PACKETMAXSIZE) {

  12. thrownew IOException(

  13. "Received packet with invalid packet: "

  14. + length);

  15. }

  16. /**

  17. * Allocates a new ByteBuffer to receive the message

  18. */

  19. byte[] msgArray = newbyte[length];

  20. din.readFully(msgArray, 0, length);

  21. ByteBuffer message = ByteBuffer.wrap(msgArray);

  22. addToRecvQueue(new Message(message.duplicate(), sid));

  23. }

  24. } catch (Exception e) {

  25. LOG.warn("Connection broken for id " + sid + ", my id = " +

  26. self.getId() + ", error = " , e);

  27. } finally {

  28. LOG.warn("Interrupting SendWorker");

  29. sw.finish();

  30. if (sock != null) {

  31. closeSocket(sock);

  32. }

  33. }

  34. }

從代碼可以看到,SenderWorker負責不斷從全局的queueSendMap中讀取自己所負責的sid對應的消息的列表,然後將消息發送給對應的sid。

而RecvWorker負責從與自己負責的sid建立的TCP連接中讀取數據放入到recvQueue的末尾。

從QuorumCnxManager.SenderWorker和QuorumCnxManager.RecvWorker的run方法中可以看出,這兩個worker都是基於QuorumCnxManager建立的連接,與對應的server進行消息的發送和接收,而要發送的消息則來自FastLeaderElection,接收到的消息,也是被FastLeaderElection處理,因此,QuorumCnxManager的兩個worker並不負責具體的算法實現,只是消息發送、接收的代理類,FastLeaderElection不需要理睬怎麼與其它的server通信、怎麼獲得其它server的投票信息這些細節,只需要從QuorumCnxManager提供的隊列裡面取消息或者放入消息。

相關推薦

推薦中...