Java開發大型互聯網Kafka設計原理之Kafka集群部署實踐

編程語言 Java Hadoop CPU 圖靈學院 圖靈學院 2017-11-01

引言

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消費。

kafka簡述

Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既可以達到10W/s的吞吐速率;完全的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行加載機制來統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分佈式系統。

kafka架構

一個典型的kafka集群中包含若干producer(可以是web前端產生的page view,或者是服務器日誌,系統CPU、memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干consumer group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發佈到broker,consumer使用pull模式從broker訂閱並消費消息。

使用

對於一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在一定程度上,尚未確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等),kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/用戶操作等信息發送到kafka中.並實時監控,或者離線統計分析等。kafka的特性決定它非常適合作為"日誌收集中心";application可以將操作日誌"批量""異步"的發送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.

原理

kafka數據存儲設計

partition以文件形式存儲在文件系統,目錄命名規則:-,例如,名為test的topic,其有3個partition,則Kafka數據目錄中有3個目錄:test-0, test-1, test-2,分別存儲相應partition的數據。

partition的數據文件

partition中的每條Message包含了以下三個屬性:

offset

MessageSize

data

其中offset表示Message在這個partition中的偏移量,offset不是該Message在partition數據文件中的實際存儲位置,而是邏輯上一個值,它唯一確定了partition中的一條Message,可以認為offset是partition中Message的id;MessageSize表示消息內容data的大小;data為Message的具體內容。

partition的數據文件由以上格式的Message組成,按offset由小到大排列在一起。

如果一個partition只有一個數據文件:

新數據是添加在文件末尾,不論文件數據文件有多大,這個操作永遠都是O(1)的。

查找某個offset的Message是順序查找的。因此,如果數據文件很大的話,查找的效率就低。

Kafka通過分段和索引來提高查找效率。

數據文件分段segment

partition物理上由多個segment文件組成,每個segment大小相等,順序讀寫。每個segment數據文件以該段中最小的offset命名,文件擴展名為.log。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個segment數據文件中。

kafka集群部署

kafka配置文件(server.properties)

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0

# Switch to enable topic deletion or not, default value is false

#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured. Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details

#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads handling network requests(最好設置為cpu核數+1)

num.network.threads=3

# The number of threads doing disk I/O(最好設置為cpu核數的2倍值)

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

# 1. Durability: Unflushed data may be lost if you are not using replication.

# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=6000

Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker

Topic:每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)

Partition:parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件

Producer:負責發佈消息到Kafka broker

Consumer:消費消息。每個consumer屬於一個特定的consumer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

總結

以 上就是我對Java開發大型互聯網Kafka設計原理之Kafka集群部署實踐問題及其優化總結,分享給大家,希望大家知道什麼是Java開發大型互聯網Kafka設計原理之Kafka集群部署實踐問題及其優化。覺得收穫的話可以點個關注收藏轉發一波喔,謝謝大佬們支持!

  • 1、多寫多敲代碼,好的代碼與紮實的基礎知識一定是實踐出來的

  • 2、可以去百度搜索騰訊課堂圖靈學院的視頻來學習一下java架構實戰案例,還挺不錯的。

  • 最後,每一位讀到這裡的網友,感謝你們能耐心地看完。希望在成為一名更優秀的Java程序員的道路上,我們可以一起學習、一起進步!都能贏取白富美,走向架構師的人生巔峰!

  • 3丶想了解學習以上課程內容可加群:469717771 驗證碼頭條(06 必過)歡迎大家的加入喲!

Java開發大型互聯網Kafka設計原理之Kafka集群部署實踐

相關推薦

推薦中...