'Kafka面試,看這一篇文章就夠了,文末有福利'

算法 人生第一份工作 文章 設計 物理 程序員隱匿者 2019-08-14
"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

Kafka面試,看這一篇文章就夠了,文末有福利

broker消息存儲

  • Kafka的消息以二進制的方式緊湊地存儲,節省了很大空間
  • 此外消息存在ByteBuffer而不是堆,這樣broker進程掛掉時,數據不會丟失,同時避免了gc問題
  • 通過零拷貝和順序尋址,讓消息存儲和讀取速度都非常快
  • 處理fetch請求的時候通過zero-copy 加快速度

broker狀態數據

broker設計中,每臺機器都保存了相同的狀態數據。主要包括以下:

  • controller所在的broker ID,即保存了當前集群中controller是哪臺broker
  • 集群中所有broker的信息:比如每臺broker的ID、機架信息以及配置的若干組連接信息
  • 集群中所有節點的信息:嚴格來說,它和上一個有些重複,不過此項是按照broker ID和監聽器類型進行分組的。對於超大集群來說,使用這一項緩存可以快速地定位和查找給定節點信息,而無需遍歷上一項中的內容,算是一個優化吧
  • 集群中所有分區的信息:所謂分區信息指的是分區的leader、ISR和AR信息以及當前處於offline狀態的副本集合。這部分數據按照topic-partitionID進行分組,可以快速地查找到每個分區的當前狀態。(注:AR表示assigned replicas,即創建topic時為該分區分配的副本集合)

broker負載均衡

分區數量負載:各臺broker的partition數量應該均勻

partition Replica分配算法如下:

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上
  3. 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負載:每臺broker的硬盤佔用大小應該均勻

在kafka1.1之前,Kafka能夠保證各臺broker上partition數量均勻,但由於每個partition內的消息數不同,可能存在不同硬盤之間內存佔用差異大的情況。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結合它和監控系統,實現自動化的負載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

  • 同步複製:要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率
  • 異步複製:Follower異步的從Leader pull數據,data只要被Leader寫入log認為已經commit,這種情況下如果Follower落後於Leader的比較多,如果Leader突然宕機,會丟失數據

Isr

Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間做了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。這樣如果leader掛了,只要Isr中有一個replica存活,就不會丟數據。

Isr動態更新

Leader會跟蹤ISR,如果ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。

broker Nodes In Zookeeper

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

Kafka面試,看這一篇文章就夠了,文末有福利

broker消息存儲

  • Kafka的消息以二進制的方式緊湊地存儲,節省了很大空間
  • 此外消息存在ByteBuffer而不是堆,這樣broker進程掛掉時,數據不會丟失,同時避免了gc問題
  • 通過零拷貝和順序尋址,讓消息存儲和讀取速度都非常快
  • 處理fetch請求的時候通過zero-copy 加快速度

broker狀態數據

broker設計中,每臺機器都保存了相同的狀態數據。主要包括以下:

  • controller所在的broker ID,即保存了當前集群中controller是哪臺broker
  • 集群中所有broker的信息:比如每臺broker的ID、機架信息以及配置的若干組連接信息
  • 集群中所有節點的信息:嚴格來說,它和上一個有些重複,不過此項是按照broker ID和監聽器類型進行分組的。對於超大集群來說,使用這一項緩存可以快速地定位和查找給定節點信息,而無需遍歷上一項中的內容,算是一個優化吧
  • 集群中所有分區的信息:所謂分區信息指的是分區的leader、ISR和AR信息以及當前處於offline狀態的副本集合。這部分數據按照topic-partitionID進行分組,可以快速地查找到每個分區的當前狀態。(注:AR表示assigned replicas,即創建topic時為該分區分配的副本集合)

broker負載均衡

分區數量負載:各臺broker的partition數量應該均勻

partition Replica分配算法如下:

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上
  3. 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負載:每臺broker的硬盤佔用大小應該均勻

在kafka1.1之前,Kafka能夠保證各臺broker上partition數量均勻,但由於每個partition內的消息數不同,可能存在不同硬盤之間內存佔用差異大的情況。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結合它和監控系統,實現自動化的負載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

  • 同步複製:要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率
  • 異步複製:Follower異步的從Leader pull數據,data只要被Leader寫入log認為已經commit,這種情況下如果Follower落後於Leader的比較多,如果Leader突然宕機,會丟失數據

Isr

Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間做了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。這樣如果leader掛了,只要Isr中有一個replica存活,就不會丟數據。

Isr動態更新

Leader會跟蹤ISR,如果ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。

broker Nodes In Zookeeper

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

Kafka面試,看這一篇文章就夠了,文末有福利

Controller負責broker故障檢查&&故障轉移(fail/recover)

  1. Controller在Zookeeper上註冊Watch,一旦有Broker宕機,其在Zookeeper對應的znode會自動被刪除,Zookeeper會觸發 Controller註冊的watch,Controller讀取最新的Broker信息
  2. Controller確定set_p,該集合包含了宕機的所有Broker上的所有Partition
  3. 對set_p中的每一個Partition,選舉出新的leader、Isr,並更新結果。
  4. 3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
  5. 3.2 決定該Partition的新Leader和Isr。如果當前ISR中有至少一個Replica還倖存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個倖存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)
"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

Kafka面試,看這一篇文章就夠了,文末有福利

broker消息存儲

  • Kafka的消息以二進制的方式緊湊地存儲,節省了很大空間
  • 此外消息存在ByteBuffer而不是堆,這樣broker進程掛掉時,數據不會丟失,同時避免了gc問題
  • 通過零拷貝和順序尋址,讓消息存儲和讀取速度都非常快
  • 處理fetch請求的時候通過zero-copy 加快速度

broker狀態數據

broker設計中,每臺機器都保存了相同的狀態數據。主要包括以下:

  • controller所在的broker ID,即保存了當前集群中controller是哪臺broker
  • 集群中所有broker的信息:比如每臺broker的ID、機架信息以及配置的若干組連接信息
  • 集群中所有節點的信息:嚴格來說,它和上一個有些重複,不過此項是按照broker ID和監聽器類型進行分組的。對於超大集群來說,使用這一項緩存可以快速地定位和查找給定節點信息,而無需遍歷上一項中的內容,算是一個優化吧
  • 集群中所有分區的信息:所謂分區信息指的是分區的leader、ISR和AR信息以及當前處於offline狀態的副本集合。這部分數據按照topic-partitionID進行分組,可以快速地查找到每個分區的當前狀態。(注:AR表示assigned replicas,即創建topic時為該分區分配的副本集合)

broker負載均衡

分區數量負載:各臺broker的partition數量應該均勻

partition Replica分配算法如下:

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上
  3. 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負載:每臺broker的硬盤佔用大小應該均勻

在kafka1.1之前,Kafka能夠保證各臺broker上partition數量均勻,但由於每個partition內的消息數不同,可能存在不同硬盤之間內存佔用差異大的情況。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結合它和監控系統,實現自動化的負載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

  • 同步複製:要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率
  • 異步複製:Follower異步的從Leader pull數據,data只要被Leader寫入log認為已經commit,這種情況下如果Follower落後於Leader的比較多,如果Leader突然宕機,會丟失數據

Isr

Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間做了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。這樣如果leader掛了,只要Isr中有一個replica存活,就不會丟數據。

Isr動態更新

Leader會跟蹤ISR,如果ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。

broker Nodes In Zookeeper

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

Kafka面試,看這一篇文章就夠了,文末有福利

Controller負責broker故障檢查&&故障轉移(fail/recover)

  1. Controller在Zookeeper上註冊Watch,一旦有Broker宕機,其在Zookeeper對應的znode會自動被刪除,Zookeeper會觸發 Controller註冊的watch,Controller讀取最新的Broker信息
  2. Controller確定set_p,該集合包含了宕機的所有Broker上的所有Partition
  3. 對set_p中的每一個Partition,選舉出新的leader、Isr,並更新結果。
  4. 3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
  5. 3.2 決定該Partition的新Leader和Isr。如果當前ISR中有至少一個Replica還倖存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個倖存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)
Kafka面試,看這一篇文章就夠了,文末有福利

3.3 更新Leader、ISR、leader_epoch、controller_epoch:寫入/brokers/topics/[topic]/partitions/[partition]/state

  1. 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

Controller掛掉

每個 broker 都會在 zookeeper 的臨時節點 "/controller" 註冊 watcher,當 controller 宕機時 "/controller" 會消失,觸發broker的watch,每個 broker 都嘗試創建新的 controller path,只有一個競選成功並當選為 controller。

使用Kafka如何保證冪等性

不丟消息

  • 首先kafka保證了對已提交消息的at least保證
  • Sender有重試機制
  • producer業務方在使用producer發送消息時,註冊回調函數。在onError方法中重發消息
  • consumer 拉取到消息後,處理完畢再commit,保證commit的消息一定被處理完畢

不重複

  • consumer拉取到消息先保存,commit成功後刪除緩存數據

Kafka高性能

  • partition提升了併發
  • zero-copy
  • 順序寫入
  • 消息聚集batch
  • 頁緩存

業務方對 Kafka producer的優化

  • 增大producer數量
  • ack配置
  • batch

最後,小編為大家準備了一些適合於1-5年以上開發經驗的java程序員面試涉及到的絕大部分面試題及答案做成了文檔和學習筆記文件以及架構視頻資料免費分享給大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分佈式、高併發等架構技術資料),希望可以幫助到大家。

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

Kafka面試,看這一篇文章就夠了,文末有福利

broker消息存儲

  • Kafka的消息以二進制的方式緊湊地存儲,節省了很大空間
  • 此外消息存在ByteBuffer而不是堆,這樣broker進程掛掉時,數據不會丟失,同時避免了gc問題
  • 通過零拷貝和順序尋址,讓消息存儲和讀取速度都非常快
  • 處理fetch請求的時候通過zero-copy 加快速度

broker狀態數據

broker設計中,每臺機器都保存了相同的狀態數據。主要包括以下:

  • controller所在的broker ID,即保存了當前集群中controller是哪臺broker
  • 集群中所有broker的信息:比如每臺broker的ID、機架信息以及配置的若干組連接信息
  • 集群中所有節點的信息:嚴格來說,它和上一個有些重複,不過此項是按照broker ID和監聽器類型進行分組的。對於超大集群來說,使用這一項緩存可以快速地定位和查找給定節點信息,而無需遍歷上一項中的內容,算是一個優化吧
  • 集群中所有分區的信息:所謂分區信息指的是分區的leader、ISR和AR信息以及當前處於offline狀態的副本集合。這部分數據按照topic-partitionID進行分組,可以快速地查找到每個分區的當前狀態。(注:AR表示assigned replicas,即創建topic時為該分區分配的副本集合)

broker負載均衡

分區數量負載:各臺broker的partition數量應該均勻

partition Replica分配算法如下:

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上
  3. 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負載:每臺broker的硬盤佔用大小應該均勻

在kafka1.1之前,Kafka能夠保證各臺broker上partition數量均勻,但由於每個partition內的消息數不同,可能存在不同硬盤之間內存佔用差異大的情況。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結合它和監控系統,實現自動化的負載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

  • 同步複製:要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率
  • 異步複製:Follower異步的從Leader pull數據,data只要被Leader寫入log認為已經commit,這種情況下如果Follower落後於Leader的比較多,如果Leader突然宕機,會丟失數據

Isr

Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間做了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。這樣如果leader掛了,只要Isr中有一個replica存活,就不會丟數據。

Isr動態更新

Leader會跟蹤ISR,如果ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。

broker Nodes In Zookeeper

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

Kafka面試,看這一篇文章就夠了,文末有福利

Controller負責broker故障檢查&&故障轉移(fail/recover)

  1. Controller在Zookeeper上註冊Watch,一旦有Broker宕機,其在Zookeeper對應的znode會自動被刪除,Zookeeper會觸發 Controller註冊的watch,Controller讀取最新的Broker信息
  2. Controller確定set_p,該集合包含了宕機的所有Broker上的所有Partition
  3. 對set_p中的每一個Partition,選舉出新的leader、Isr,並更新結果。
  4. 3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
  5. 3.2 決定該Partition的新Leader和Isr。如果當前ISR中有至少一個Replica還倖存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個倖存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)
Kafka面試,看這一篇文章就夠了,文末有福利

3.3 更新Leader、ISR、leader_epoch、controller_epoch:寫入/brokers/topics/[topic]/partitions/[partition]/state

  1. 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

Controller掛掉

每個 broker 都會在 zookeeper 的臨時節點 "/controller" 註冊 watcher,當 controller 宕機時 "/controller" 會消失,觸發broker的watch,每個 broker 都嘗試創建新的 controller path,只有一個競選成功並當選為 controller。

使用Kafka如何保證冪等性

不丟消息

  • 首先kafka保證了對已提交消息的at least保證
  • Sender有重試機制
  • producer業務方在使用producer發送消息時,註冊回調函數。在onError方法中重發消息
  • consumer 拉取到消息後,處理完畢再commit,保證commit的消息一定被處理完畢

不重複

  • consumer拉取到消息先保存,commit成功後刪除緩存數據

Kafka高性能

  • partition提升了併發
  • zero-copy
  • 順序寫入
  • 消息聚集batch
  • 頁緩存

業務方對 Kafka producer的優化

  • 增大producer數量
  • ack配置
  • batch

最後,小編為大家準備了一些適合於1-5年以上開發經驗的java程序員面試涉及到的絕大部分面試題及答案做成了文檔和學習筆記文件以及架構視頻資料免費分享給大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分佈式、高併發等架構技術資料),希望可以幫助到大家。

Kafka面試,看這一篇文章就夠了,文末有福利

"

本文轉自:靳剛同學

MQ(消息隊列)是跨進程通信的方式之一,可理解為異步rpc,上游系統對調用結果的態度往往是重要不緊急。使用消息隊列有以下好處:業務解耦、流量削峰、靈活擴展。接下來介紹消息中間件Kafka。

Kafka是什麼?

Kafka是一個分佈式的消息引擎。具有以下特徵

  1. 能夠發佈和訂閱消息流(類似於消息隊列)
  2. 以容錯的、持久的方式存儲消息流
  3. 多分區概念,提高了並行能力

Kafka架構總覽

Kafka面試,看這一篇文章就夠了,文末有福利

Topic

消息的主題、隊列,每一個消息都有它的topic,Kafka通過topic對消息進行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(Partition),每個分區在物理上對應一個文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴展。

Partition

每個分區都是一個 順序的、不可變的消息隊列, 並且可以持續的添加;分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

producer在發佈消息的時候,可以為每條消息指定Key,這樣消息被髮送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設置的合理,那麼所有的消息將會被均勻的分佈到不同的分區中,這樣就實現了負載均衡。

Kafka面試,看這一篇文章就夠了,文末有福利

Broker

Kafka server,用來存儲消息,Kafka集群中的每一個服務器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發送消息,生產者會根據topic分發消息。生產者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。算法可由開發者定義。

Cousumer

Consermer實例可以是獨立的進程,負責訂閱和消費消息。消費者用consumerGroup來標識自己。同一個消費組可以併發地消費多個分區的消息,同一個partition也可以由多個consumerGroup併發消費,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發送給其中一個Consumer

Kafka producer 設計原理

發送消息的流程

Kafka面試,看這一篇文章就夠了,文末有福利

1.序列化消息&&.計算partition

根據key和value的配置對消息進行序列化,然後計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition。否則根據key和topic的partition數目取餘,如果key也沒有的話就隨機生成一個counter,使用這個counter來和partition數目取餘。這個counter每次使用的時候遞增。

2發送到batch&&喚醒Sender 線程

根據topic-partition獲取對應的batchs(Dueue<ProducerBatch>),然後將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊列的操作是加鎖執行,所以batch內消息時有序的。後續的Sender操作當前方法異步操作。

Kafka面試,看這一篇文章就夠了,文末有福利

3.Sender把消息有序發到 broker(tp replia leader)

3.1 確定tp relica leader 所在的broker

  • Kafka中 每臺broker都保存了kafka集群的metadata信息,metadata信息裡包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同時根據metadata更新策略(定期更新metadata.max.age.ms、失效檢測,強制更新:檢查到metadata失效以後,調用metadata.requestUpdate()強制更新
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;}

3.2 冪等性發送

為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,則Broker會接受它,否則將其丟棄:

  • 如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
  • Sender發送失敗後會重試,這樣可以保證每個消息都被髮送到broker

4. Sender處理broker發來的produce response

一旦broker處理完Sender的produce請求,就會發送produce response給Sender,此時producer將執行我們為send()設置的回調函數。至此producer的send執行完畢。

吞吐性&&延時:

  • buffer.memory:buffer設置大了有助於提升吞吐性,但是batch太大會增大延遲,可搭配linger_ms參數使用
  • linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢,我們可以強制在linger_ms時間後發送batch數據
  • ack:producer收到多少broker的答覆才算真的發送成功
  • 0表示producer無需等待leader的確認(吞吐最高、數據可靠性最差)
  • 1代表需要leader確認寫入它的本地log並立即確認
  • -1/all 代表所有的ISR都完成後確認(吞吐最低、數據可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例,新增到broker的長連接。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }
Kafka面試,看這一篇文章就夠了,文末有福利

終端查看TCP連接數:

lsof -p portNum -np | grep TCP

Consumer設計原理

poll消息

Kafka面試,看這一篇文章就夠了,文末有福利

  • 消費者通過fetch線程拉消息(單線程)
  • 消費者通過心跳線程來與broker發送心跳。超時會認為掛掉
  • 每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出,以及消費消息的位移都由coordnator處理。

位移管理

consumer的消息位移代表了當前group對topic-partition的消費進度,consumer宕機重啟後可以繼續從該offset開始消費。

在kafka0.8之前,位移信息存放在zookeeper上,由於zookeeper不適合高併發的讀寫,新版本Kafka把位移信息當成消息,發往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認有50個分區。

消息的key 是groupId+topic_partition,value 是offset.

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利


Kafka Group 狀態

Kafka面試,看這一篇文章就夠了,文末有福利

  • Empty:初始狀態,Group 沒有任何成員,如果所有的 offsets 都過期的話就會變成 Dead
  • PreparingRebalance:Group 正在準備進行 Rebalance
  • AwaitingSync:Group 正在等待來 group leader 的 分配方案
  • Stable:穩定的狀態(Group is stable);
  • Dead:Group 內已經沒有成員,並且它的 Metadata 已經被移除
  • 注意

重平衡reblance

當一些原因導致consumer對partition消費不再均勻時,kafka會自動執行reblance,使得consumer對partition的消費再次平衡。

什麼時候發生rebalance?:

  • 組訂閱topic數變更
  • topic partition數變更
  • consumer成員變更
  • consumer 加入群組或者離開群組的時候
  • consumer被檢測為崩潰的時候

reblance過程

舉例1 consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內沒和broker發送心跳,此時coordnator認為該group應該進行reblance。接下來其他consumer發來fetch請求後,coordnator將回復他們進行reblance通知。當consumer成員收到請求後,只有leader會根據分配策略進行分配,然後把各自的分配結果返回給coordnator。這個時候只有consumer leader返回的是實質數據,其他返回的都為空。收到分配方法後,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

  1. 使用join協議,表示有consumer 要加入到group中
  2. 使用sync 協議,根據分配規則進行分配
Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

(上圖圖片摘自網絡)

引申:以上reblance機制存在的問題

在大型系統中,一個topic可能對應數百個consumer實例。這些consumer陸續加入到一個空消費組將導致多次的rebalance;此外consumer 實例啟動的時間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會再次觸發rebalance,而每次rebalance的代價又相當地大,因為很多狀態都需要在rebalance前被持久化,而在rebalance後被重新初始化。

新版本改進

通過延遲進入PreparingRebalance狀態減少reblance次數

Kafka面試,看這一篇文章就夠了,文末有福利

新版本新增了group.initial.rebalance.delay.ms參數。空消費組接受到成員加入請求時,不立即轉化到PreparingRebalance狀態來開啟reblance。當時間超過group.initial.rebalance.delay.ms後,再把group狀態改為PreparingRebalance(開啟reblance)。實現機制是在coordinator底層新增一個group狀態:InitialReblance。假設此時有多個consumer陸續啟動,那麼group狀態先轉化為InitialReblance,待group.initial.rebalance.delay.ms時間後,再轉換為PreparingRebalance(開啟reblance)

Broker設計原理

Broker 是Kafka 集群中的節點。負責處理生產者發送過來的消息,消費者消費的請求。以及集群節點的管理等。由於涉及內容較多,先簡單介紹,後續專門抽出一篇文章分享

broker zk註冊

Kafka面試,看這一篇文章就夠了,文末有福利

broker消息存儲

  • Kafka的消息以二進制的方式緊湊地存儲,節省了很大空間
  • 此外消息存在ByteBuffer而不是堆,這樣broker進程掛掉時,數據不會丟失,同時避免了gc問題
  • 通過零拷貝和順序尋址,讓消息存儲和讀取速度都非常快
  • 處理fetch請求的時候通過zero-copy 加快速度

broker狀態數據

broker設計中,每臺機器都保存了相同的狀態數據。主要包括以下:

  • controller所在的broker ID,即保存了當前集群中controller是哪臺broker
  • 集群中所有broker的信息:比如每臺broker的ID、機架信息以及配置的若干組連接信息
  • 集群中所有節點的信息:嚴格來說,它和上一個有些重複,不過此項是按照broker ID和監聽器類型進行分組的。對於超大集群來說,使用這一項緩存可以快速地定位和查找給定節點信息,而無需遍歷上一項中的內容,算是一個優化吧
  • 集群中所有分區的信息:所謂分區信息指的是分區的leader、ISR和AR信息以及當前處於offline狀態的副本集合。這部分數據按照topic-partitionID進行分組,可以快速地查找到每個分區的當前狀態。(注:AR表示assigned replicas,即創建topic時為該分區分配的副本集合)

broker負載均衡

分區數量負載:各臺broker的partition數量應該均勻

partition Replica分配算法如下:

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上
  3. 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負載:每臺broker的硬盤佔用大小應該均勻

在kafka1.1之前,Kafka能夠保證各臺broker上partition數量均勻,但由於每個partition內的消息數不同,可能存在不同硬盤之間內存佔用差異大的情況。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結合它和監控系統,實現自動化的負載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

  • 同步複製:要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率
  • 異步複製:Follower異步的從Leader pull數據,data只要被Leader寫入log認為已經commit,這種情況下如果Follower落後於Leader的比較多,如果Leader突然宕機,會丟失數據

Isr

Kafka結合同步複製和異步複製,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數據不丟失和吞吐率之間做了平衡。Producer只需把消息發送到Partition Leader,Leader將消息寫入本地Log。Follower則從Leader pull數據。Follower在收到該消息向Leader發送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。這樣如果leader掛了,只要Isr中有一個replica存活,就不會丟數據。

Isr動態更新

Leader會跟蹤ISR,如果ISR中一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發送fetch請求。

broker Nodes In Zookeeper

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

Kafka面試,看這一篇文章就夠了,文末有福利

Controller負責broker故障檢查&&故障轉移(fail/recover)

  1. Controller在Zookeeper上註冊Watch,一旦有Broker宕機,其在Zookeeper對應的znode會自動被刪除,Zookeeper會觸發 Controller註冊的watch,Controller讀取最新的Broker信息
  2. Controller確定set_p,該集合包含了宕機的所有Broker上的所有Partition
  3. 對set_p中的每一個Partition,選舉出新的leader、Isr,並更新結果。
  4. 3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
  5. 3.2 決定該Partition的新Leader和Isr。如果當前ISR中有至少一個Replica還倖存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個倖存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)
Kafka面試,看這一篇文章就夠了,文末有福利

3.3 更新Leader、ISR、leader_epoch、controller_epoch:寫入/brokers/topics/[topic]/partitions/[partition]/state

  1. 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

Controller掛掉

每個 broker 都會在 zookeeper 的臨時節點 "/controller" 註冊 watcher,當 controller 宕機時 "/controller" 會消失,觸發broker的watch,每個 broker 都嘗試創建新的 controller path,只有一個競選成功並當選為 controller。

使用Kafka如何保證冪等性

不丟消息

  • 首先kafka保證了對已提交消息的at least保證
  • Sender有重試機制
  • producer業務方在使用producer發送消息時,註冊回調函數。在onError方法中重發消息
  • consumer 拉取到消息後,處理完畢再commit,保證commit的消息一定被處理完畢

不重複

  • consumer拉取到消息先保存,commit成功後刪除緩存數據

Kafka高性能

  • partition提升了併發
  • zero-copy
  • 順序寫入
  • 消息聚集batch
  • 頁緩存

業務方對 Kafka producer的優化

  • 增大producer數量
  • ack配置
  • batch

最後,小編為大家準備了一些適合於1-5年以上開發經驗的java程序員面試涉及到的絕大部分面試題及答案做成了文檔和學習筆記文件以及架構視頻資料免費分享給大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分佈式、高併發等架構技術資料),希望可以幫助到大家。

Kafka面試,看這一篇文章就夠了,文末有福利

Kafka面試,看這一篇文章就夠了,文末有福利

領取書籍300集Java視頻教程的方式:

1、關注小編,並轉發本文

2、私信小編:“學習”就可以免費領取啦

"

相關推薦

推薦中...