基礎知識篇
1、 AMQP協議
Advanced Message Queuing Protocol (高級消息隊列協議)
The Advanced Message Queuing Protocol (AMQP):是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了通過網絡發送的字節流的數據格式。因此兼容性非常好,任何實現AMQP協議的程序都可以和與AMQP協議兼容的其他程序交互,可以很容易做到跨語言,跨平臺。
上面說的3種比較流行的消息隊列協議,要麼支持AMQP協議,要麼借鑑了AMQP協議的思想進行了開發、實現、設計。
2、 一些基本的概念
1、消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序。(我的理解就是處理消息的角色,比如執行將消息處理後入庫)
2、生產者:(Producer) :向broker發佈消息的應用程序。
3、AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)
3、Kafka架構
生產者生產消息、kafka集群、消費者獲取消息這樣一種架構,如下圖:
kafka集群中的消息,是通過Topic(主題)來進行組織的,如下圖:
一些基本的名詞解釋:
1、主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。
2、分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區裡面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
工作圖:
備份(Replication):為了保證分佈式可靠性,kafka0.8開始對每個分區的數據進行備份(不同的Broker上),防止其中一個Broker宕機造成分區上的數據不可用。
安裝篇
先下載kafka的安裝包
命令: wget http://mirrors.hust.edu.cn/apache/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz
2.然後把kafka解壓到安裝目錄下,重命名kafka安裝包,方便配置
命令:tar -zvxf kafka_2.10-0.10.2.1.tgz -C ~/Document/jianghongfeng/kafka/
命令:mv kafka_2.10-0.10.2.1 kafk
進入kafka/bin文件夾,這裡有各種功能腳本,包括髮送消息,消費消息、創建topic,查看topic以及各種啟動、停止服務腳本等
3.啟動zookeeper
命令:sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
啟動後會停在這個界面,說明啟動成功。
4.啟動kafka服務
命令: sh kafka-server-start.sh ../config/server.properties
配置篇
1.開放2181/9092端口,編輯防火牆規則,並重啟防火牆。
命令:vim /etc/sysconfig/iptables
命令:service iptables restart
2.配置kafka的server.properties 。
命令: vim ../config/server.properties (當前位置是kafka的bin目錄)
(修改部分是加粗了的部分,不修改部分未列出)
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://自己機器IP:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://自己機器IP:9092
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=自己機器IP:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
測試篇
1.創建topic
命令:sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jianghongfeng
localhost可以改成你機器的IP,但是前提是配置了server.properties 。
2.查看topic列表
命令:sh kafka-topics.sh --list --zookeeper localhost:2181
localhost可以改成你機器的IP,但是前提是配置了server.properties 。
3.發送消息
命令: sh kafka-console-producer.sh --broker-list localhost:9092 --topic jianghongfeng
localhost可以改成你機器的IP,但是前提是配置了server.properties 。
4.客戶端接收消息
命令:sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic jianghongfeng --from-beginning
localhost可以改成你機器的IP,但是前提是配置了server.properties 。
說明
如果是虛擬機搭建Kafka單機環境,配置虛擬機的時候把網絡配置改成橋接模式會穩定很多。
關於防火牆的問題可以參考我的上篇文章:雲服務器CentOS新機搭建JAVA+Tomcat運行環境
以上內容部分來自網絡,加上了我自己的一些理解和操作截圖,參考網址如下:
http://blog.csdn.net/fuyuwei2015/article/details/73379055
http://blog.csdn.net/xufan007/article/details/51898262