以下文章來自賈云社區,作者曲志平。
簡介|kafka3.0版本已經嘗試將kafka架構實現到zk。如果去掉zk,新版卡夫卡中用什么技術代替zk?接下來,我們來了解和學習一下卡夫卡內置的共識機制和raft算法。
一、Kafka簡介
Kafka是一個開源的消息引擎系統。一個典型的Kafka架構包括幾個生產者、幾個經紀人、幾個消費者和一個ZooKeeper集群,如上圖所示。其中,ZooKeeper被Kafka用來管理集群元數據、選擇控制器等操作。生產者向代理發送消息,代理負責將收到的消息存儲到磁盤,消費者負責訂閱和消費來自代理的消息。
(一)Kafka核心組件
生產者:消息生產者是向代理發送消息的客戶機。
消費者:消息消費者是從代理獲取數據的客戶機。
消費者群體:消費者群體,由若干消費者組成。一個消費群中的每個消費者負責消費不同的分區,一個分區只能被同一消費群中的一個消費者消費;消費者相互獨立,互不影響。所有的消費者都屬于某一個消費者群體,即該消費者群體是一個邏輯訂閱者。
代理:一個服務器是一個代理,一個集群由多個代理組成,一個代理可以有多個主題。
主題:可以理解為一個隊列,所有的生產者和消費者都是面向主題的。
分區:分區。kafka中的主題將它分發給不同的代理,以提高可伸縮性和實現高可用性。一個主題可以分為多個分區,每個分區都是有序的,即消息發送到隊列的順序與消費時拉取的順序一致。
復制:副本。一個主題對應的分區可以有多個副本,其中只有一個是領導者,其余都是跟隨者。為了保證數據的高可用性,leader和follower會盡可能的均勻分布在broker中,避免因為leader所在的服務器宕機而導致話題不可用的問題。
(二)kafka2當中zk的作用
/admin:卡夫卡的關鍵信息,包括被刪除的話題,都會保存在這個路徑下。
/brokers:主要用于保存kafka集群中的經紀人信息和未被刪除的話題信息。
/cluster3360主要用于存儲kafka集群的唯一id信息。每個kafka集群將被分配一個唯一的id和相應的版本號。
/config:群集配置信息。
/Controller:Kafka集群中的控制器信息,控制器組件,是Apache Kafka的核心組件。它的主要功能是在Apache ZooKeeper的幫助下管理和協調整個Kafka集群。
/controller_epoch:主要用于記錄控制器選舉的次數。
/ISR _ change _ notification:當ISR列表更改時。
通知,在kafka當中由于存在ISR列表變更的情況發生,為了保證ISR列表更新的及時性,定義了isr_change_notification這個節點,主要用于通知Controller來及時將ISR列表進行變更。/latest_producer_id_block:使用`/latest_producer_id_block`節點來保存PID塊,主要用于能夠保證生產者的任意寫入請求都能夠得到響應。
/log_dir_event_notification:主要用于保存當broker當中某些LogDir出現異常時候,例如磁盤損壞,文件讀寫失敗等異常時候,向ZK當中增加一個通知序號,controller監聽到這個節點的變化之后,就會做出對應的處理操作。
以上就是kafka在zk當中保留的所有的所有的相關的元數據信息,這些元數據信息保證了kafka集群的正常運行。
二、kafka3的安裝配置
在kafka3的版本當中已經徹底去掉了對zk的依賴,如果沒有了zk集群,那么kafka當中是如何保存元數據信息的呢,這里我們通過kafka3的集群來一探究竟。
(一)kafka安裝配置核心重要參數
Controller服務器
不管是kafka2還是kafka3當中,controller控制器都是必不可少的,通過controller控制器來維護kafka集群的正常運行,例如ISR列表的變更,broker的上線或者下線,topic的創建,分區的指定等等各種操作都需要依賴于Controller,在kafka2當中,controller的選舉需要通過zk來實現,我們沒法控制哪些機器選舉成為Controller,而在kafka3當中,我們可以通過配置文件來自己指定哪些機器成為Controller,這樣做的好處就是我們可以指定一些配置比較高的機器作為Controller節點,從而保證controller節點的穩健性。
被選中的controller節點參與元數據集群的選舉,每個controller節點要么是Active狀態,或者就是standBy狀態。
Process.Roles
使用KRaft模式來運行kafka集群的話,我們有一個配置叫做Process.Roles必須配置,這個參數有以下四個值可以進行配置:
Process.Roles=Broker, 服務器在KRaft模式中充當Broker。
Process.Roles=Controller, 服務器在KRaft模式下充當Controller。
Process.Roles=Broker,Controller,服務器在KRaft模式中同時充當Broker和Controller。
如果process.roles沒有設置。那么集群就假定是運行在ZooKeeper模式下。
如果需要從zookeeper模式轉換成為KRaft模式,那么需要進行重新格式化。如果一個節點同時是Broker和Controller節點,那么就稱之為組合節點。
實際工作當中,如果有條件的話,盡量還是將Broker和Controller節點進行分離部署。避免由于服務器資源不夠的情況導致OOM等一系列的問題
Quorum Voters
通過controller.quorum.voters配置來實習哪些節點是Quorum的投票節點,所有想要成為控制器的節點,都必須放到這個配置里面。
每個Broker和每個Controller都必須配置Controller.quorum.voters,該配置當中提供的節點ID必須與提供給服務器的節點ID保持一直。
每個Broker和每個Controller 都必須設置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的節點ID必須與提供給服務器的節點ID匹配。
比如在Controller1上,node.Id必須設置為1,以此類推。注意,控制器id不強制要求你從0或1開始。然而,分配節點ID的最簡單和最不容易混淆的方法是給每個服務器一個數字ID,然后從0開始。
(二)下載并解壓安裝包
bigdata01下載kafka的安裝包,并進行解壓:
[hadoop@bigdata01 kraft]$ cd /opt/soft/
[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1./kafka_2.12-3.1..tgz
[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1..tgz -C /opt/install/
修改kafka的配置文件broker.properties:
[hadoop@bigdata01 kafka_2.12-3.1.]$ cd /opt/install/kafka_2.12-3.1./config/kraft/
[hadoop@bigdata01 kraft]$ vim broker.properties
修改編輯內容如下:
node.id=1
controller.quorum.voters=1@bigdata01:9093
listeners=PLAINTEXT://bigdata01:9092
advertised.listeners=PLAINTEXT://bigdata01:9092
log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs
創建兩個文件夾:
[hadoop@bigdata01 kafka_2.12-3.1.]$ mkdir -p /opt/install/kafka_2.12-3.1./kraftlogs
[hadoop@bigdata01 kafka_2.12-3.1.]$ mkdir -p /opt/install/kafka_2.12-3.1./topiclogs
同步安裝包到其他機器上面去。
(三)服務器集群啟動
啟動kafka服務:
[hadoop@bigdata01 kafka_2.12-3.1.]$ ./bin/kafka-storage.sh random-uuid
YkJwr6RESgSJv-sxa1R1mA
[hadoop@bigdata01 kafka_2.12-3.1.]$ ./bin/kafka-storage.sh format -t YkJwr6RESgSJv-sxa1R1mA -c ./config/kraft/server.properties
Formatting /opt/install/kafka_2.12-3.1./topiclogs
[hadoop@bigdata01 kafka_2.12-3.1.]$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
(四)創建kafka的topic
集群啟動成功之后,就可以來創建kafka的topic了,使用以下命令來創建kafka的topic:
./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
(五)任意一臺機器查看kafka的topic
組成集群之后,任意一臺機器就可以通過以下命令來查看到剛才創建的topic了:
[hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1./
[hadoop@bigdata03 kafka_2.12-3.1.]$ bin/kafka-topics.sh --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
(六)消息生產與消費
使用命令行來生產以及消費kafka當中的消息:
[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test
[hadoop@bigdata02 kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning
三、Kafka當中Raft的介紹
(一)kafka強依賴zk所引發的問題
前面我們已經看到了kafka3集群在沒有zk集群的依賴下,也可以正常運行,那么kafka2在zk當中保存的各種重要元數據信息,在kafka3當中如何實現保存的呢?
kafka一直都是使用zk來管理集群以及所有的topic的元數據,并且使用了zk的強一致性來選舉集群的controller,controller對整個集群的管理至關重要,包括分區的新增,ISR列表的維護,等等很多功能都需要靠controller來實現,然后使用zk來維護kafka的元數據也存在很多的問題以及存在性能瓶頸。
以下是kafka將元數據保存在zk當中的諸多問題。
元數據存取困難
元數據的存取過于困難,每次重新選舉的controller需要把整個集群的元數據重新restore,非常的耗時且影響集群的可用性。
元數據更新網絡開銷大
整個元數據的更新操作也是以全量推的方式進行,網絡的開銷也會非常大。
強耦合違背軟件設計原則
Zookeeper對于運維來說,維護Zookeeper也需要一定的開銷,并且kafka強耦合與zk也并不好,還得時刻擔心zk的宕機問題,違背軟件設計的高內聚,低耦合的原則。
網絡分區復雜度高
Zookeeper本身并不能兼顧到broker與broker之間通信的狀態,這就會導致網絡分區的復雜度成幾何倍數增長。
zk本身不適合做消息隊列
zookeeper不適合做消息隊列,因為zookeeper有1M的消息大小限制 zookeeper的children太多會極大的影響性能znode太大也會影響性能 znode太大會導致重啟zkserver耗時10-15分鐘 zookeeper僅使用內存作為存儲,所以不能存儲太多東西。
并發訪問zk問題多
最好單線程操作zk客戶端,不要并發,臨界、競態問題太多。
基于以上各種問題,所以提出了脫離zk的方案,轉向自助研發強一致性的元數據解決方案,也就是KIP-500。
KIP-500議案提出了在Kafka中處理元數據的更好方法?;舅枷胧?Kafka on Kafka",將Kafka的元數據存儲在Kafka本身中,無需增加額外的外部存儲比如ZooKeeper等。
去zookeeper之后的kafka新的架構
在KIP-500中,Kafka控制器會將其元數據存儲在Kafka分區中,而不是存儲在ZooKeeper中。但是,由于控制器依賴于該分區,因此分區本身不能依賴控制器來進行領導者選舉之類的事情。而是,管理該分區的節點必須實現自我管理的Raft仲裁。
在kafka3.0的新的版本當中,使用了新的KRaft協議,使用該協議來保證在元數據仲裁中準確的復制元數據,這個協議類似于zk當中的zab協議以及類似于Raft協議,但是KRaft協議使用的是基于事件驅動的模式,與ZAB協議和Raft協議還有點不一樣
在kafka3.0之前的的版本當中,主要是借助于controller來進行leader partition的選舉,而在3.0協議當中,使用了KRaft來實現自己選擇leader,并最終令所有節點達成共識,這樣簡化了controller的選舉過程,效果更加高效。
(二)kakfa3 Raft
前面我們已經知道了在kafka3當中可以不用再依賴于zk來保存kafka當中的元數據了,轉而使用Kafka Raft來實現元數據的一致性,簡稱KRaft,并且將元數據保存在kafka自己的服務器當中,大大提高了kafka的元數據管理的性能。
KRaft運行模式的Kafka集群,不會將元數據存儲在Apache ZooKeeper中。即部署新集群的時候,無需部署ZooKeeper集群,因為Kafka將元數據存儲在Controller節點的KRaft Quorum中。KRaft可以帶來很多好處,比如可以支持更多的分區,更快速的切換Controller,也可以避免Controller緩存的元數據和Zookeeper存儲的數據不一致帶來的一系列問題。
在新的版本當中,控制器Controller節點我們可以自己進行指定,這樣最大的好處就是我們可以自己選擇一些配置比較好的機器成為Controller節點,而不像在之前的版本當中,我們無法指定哪臺機器成為Controller節點,而且controller節點與broker節點可以運行在同一臺機器上,并且控制器controller節點不再向broker推送更新消息,而是讓Broker從這個Controller Leader節點進行拉去元數據的更新。
(三)如何查看kafka3當中的元數據信息
在kafka3當中,不再使用zk來保存元數據信息了,那么在kafka3當中如何查看元數據信息呢,我們也可以通過kafka自帶的命令來進行查看元數據信息,在KRaft中,有兩個命令常用命令腳本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我們來進行關注,因為我們可以通過這兩個腳本來查看kafka當中保存的元數據信息。
Kafka-dump-log.sh腳本來導出元數據信息
KRaft模式下,所有的元數據信息都保存到了一個內部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我們都可以去到這個topic上面進行查看,我們可以通過kafka-dump-log.sh這個腳本來進行查看該topic的信息。
Kafka-dump-log.sh是一個之前就有的工具,用來查看Topic的的文件內容。這工具加了一個參數--cluster-metadata-decoder用來,查看元數據日志,如下所示:
[hadoop@bigdata01 kafka_2.12-3.1.]$ cd /opt/install/kafka_2.12-3.1.
[hadoop@bigdata01 kafka_2.12-3.1.]$ bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /opt/install/kafka_2.12-3.1./topiclogs/__cluster_metadata-/00000000000000000000.index,/opt/install/kafka_2.12-3.1./topiclogs/__cluster_metadata-/00000000000000000000.log >>/opt/metadata.txt
kafka-metadata-shell.sh直接查看元數據信息
平時我們用zk的時候,習慣了用zk命令行查看數據,簡單快捷。bin目錄下自帶了kafka-metadata-shell.sh工具,可以允許你像zk一樣方便的查看數據。
使用kafka-metadata-shell.sh腳本進入kafka的元數據客戶端
[hadoop@bigdata01 kafka_2.12-3.1.]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1./topiclogs/__cluster_metadata-/00000000000000000000.log
四、Raft算法介紹
raft算法中文版本翻譯介紹:
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
著名的CAP原則又稱CAP定理的提出,真正奠基了分布式系統的誕生,CAP定理指的是在一個分布式系統中,[一致性]、[可用性](Availability)、[分區容錯性](Partition tolerance),這三個要素最多只能同時實現兩點,不可能三者兼顧(nosql)。
分布式系統為了提高系統的可靠性,一般都會選擇使用多副本的方式來進行實現,例如hdfs當中數據的多副本,kafka集群當中分區的多副本等,但是一旦有了多副本的話,那么久面臨副本之間一致性的問題,而一致性算法就是 用于解決分布式環境下多副本的數據一致性的問題。業界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比較晦澀難懂,不太容易理解,所以還有一種叫做Raft的算法,更加簡單容易理解的實現了一致性算法。
(一)Raft協議的工作原理
Raft協議當中的角色分布
Raft協議將分布式系統當中的角色分為Leader(領導者),Follower(跟從者)以及Candidate(候選者)
Leader:主節點的角色,主要是接收客戶端請求,并向Follower同步日志,當日志同步到過半及以上節點之后,告訴follower進行提交日志。
Follower:從節點的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,進行提交保存的日志。
Candidate:Leader選舉過程中的臨時角色。
Raft協議當中的底層原理
Raft協議當中會選舉出Leader節點,Leader作為主節點,完全負責replicate log的管理。Leader負責接受所有客戶端的請求,然后復制到Follower節點,如果leader故障,那么follower會重新選舉leader,Raft協議的一致性,概括主要可以分為以下三個重要部分:
Leader選舉
日志復制
安全性
其中Leader選舉和日志復制是Raft協議當中最為重要的。
Raft協議要求系統當中,任意一個時刻,只有一個leader,正常工作期間,只有Leader和Follower角色,并且Raft協議采用了類似網絡租期的方式來進行管理維護整個集群,Raft協議將時間分為一個個的時間段(term),也叫作任期,每一個任期都會選舉一個Leader來管理維護整個集群,如果這個時間段的Leader宕機,那么這一個任期結束,繼續重新選舉leader。
Raft算法將時間劃分成為任意不同長度的任期(term)。任期用連續的數字進行表示。每一個任期的開始都是一次選舉(election),一個或多個候選人會試圖成為領導人。如果一個候選人贏得了選舉,它就會在該任期的剩余時間擔任領導人。在某些情況下,選票會被瓜分,有可能沒有選出領導人,那么,將會開始另一個任期,并且立刻開始下一次選舉。Raft算法保證在給定的一個任期最多只有一個領導人。
Leader選舉的過程
Raft使用心跳來進行觸發leader選舉,當服務器啟動時,初始化為follower角色。leader向所有Follower發送周期性心跳,如果Follower在選舉超時間內沒有收到Leader的心跳,就會認為leader宕機,稍后發起leader的選舉。
每個Follower都會有一個倒計時時鐘,是一個隨機的值,表示的是Follower等待成為Leader的時間,倒計時時鐘先跑完,就會當選成為Leader,這樣做得好處就是每一個節點都有機會成為Leader。
當滿足以下三個條件之一時,Quorum中的某個節點就會觸發選舉:
向Leader發送Fetch請求后,在超時閾值quorum.fetch.timeout.ms之后仍然沒有得到Fetch響應,表示Leader疑似失敗。
從當前Leader收到了EndQuorumEpoch請求,表示Leader已退位。
Candidate狀態下,在超時閾值quorum.election.timeout.ms之后仍然沒有收到多數票,也沒有Candidate贏得選舉,表示此次選舉作廢,重新進行選舉。
具體詳細過程實現描述如下:
增加節點本地的current term,切換到candidate狀態。
自己給自己投一票。
給其他節點發送RequestVote RPCs,要求其他節點也投自己一票。
等待其他節點的投票回復。
整個過程中的投票過程可以用下圖進行表述。
leader節點選舉的限制
每個節點只能投一票,投給自己或者投給別人。
候選人所知道的日志信息,一定不能比自己的更少,即能被選舉成為leader節點,一定包含了所有已經提交的日志。
先到先得的原則
數據一致性保證(日志復制機制)
前面通過選舉機制之后,選舉出來了leader節點,然后leader節點對外提供服務,所有的客戶端的請求都會發送到leader節點,由leader節點來調度這些并發請求的處理順序,保證所有節點的狀態一致,leader會把請求作為日志條目(Log entries)加入到他的日志當中,然后并行的向其他服務器發起AppendEntries RPC復制日志條目。當這條請求日志被成功復制到大多數服務器上面之后,Leader將這條日志應用到它的狀態機并向客戶端返回執行結果。
客戶端的每個請求都包含被復制狀態機執行的指令
leader將客戶端請求作為一條心得日志添加到日志文件中,然后并行發起RPC給其他的服務器,讓他們復制這條信息到自己的日志文件中保存。
如果這條日志被成功復制,也就是大部分的follower都保存好了執行指令日志,leader就應用這條日志到自己的狀態機中,并返回給客戶端。
如果follower宕機或者運行緩慢或者數據丟失,leader會不斷地進行重試,直至所有在線的follower都成功復制了所有的日志條目。
與維護Consumer offset的方式類似,脫離ZK之后的Kafka集群將元數據視為日志,保存在一個內置的Topic中,且該Topic只有一個Partition。
元數據日志的消息格式與普通消息沒有太大不同,但必須攜帶Leader的紀元值(即之前的Controller epoch):
Record => Offset LeaderEpoch ControlType Key Value Timestamp
這樣,Follower以拉模式復制Leader日志,就相當于以Consumer角色消費元數據Topic,符合Kafka原生的語義。
那么在KRaft協議中,是如何維護哪些元數據日志已經提交——即已經成功復制到多數的Follower節點上的呢?Kafka仍然借用了原生副本機制中的概念——high watermark(HW,高水位線)保證日志不會丟失,HW的示意圖如下。
狀態機說明:
要讓所有節點達成一致性的狀態,大部分都是基于復制狀態機來實現的(Replicated state machine)
簡單來說就是:初始相同的狀態+相同的輸入過程=相同的結束狀態,這個其實也好理解,就類似于一對雙胞胎,出生時候就長得一樣,然后吃的喝的用的穿的都一樣,你自然很難分辨。其中最重要的就是一定要注意中間的相同輸入過程,各個不同節點要以相同且確定性的函數來處理輸入,而不要引入一個不確定的值。使用replicated log來實現每個節點都順序的寫入客戶端請求,然后順序的處理客戶端請求,最終就一定能夠達到最終一致性。
狀態機安全性保證:
在安全性方面,KRaft與傳統Raft的選舉安全性、領導者只追加、日志匹配和領導者完全性保證都是幾乎相同的。下面只簡單看看狀態機安全性是如何保證的,仍然舉論文中的極端例子:
在時刻a,節點S1是Leader,epoch=2的日志只復制給了S2就崩潰了。
在時刻b,S5被選舉為Leader,epoch=3的日志還沒來得及復制,也崩潰了。
在時刻c,S1又被選舉為Leader,繼續復制日志,將epoch=2的日志給了S3。此時該日志復制給了多數節點,但還未提交。
在時刻d,S1又崩潰,并且S5重新被選舉為領導者,將epoch=3的日志復制給S0~S4。
此時日志與新Leader S5的日志發生了沖突,如果按上圖中d1的方式處理,消息2就會丟失。傳統Raft協議的處理方式是:在Leader任期開始時,立刻提交一條空的日志,所以上圖中時刻c的情況不會發生,而是如同d2一樣先提交epoch=4的日志,連帶提交epoch=2的日志。
與傳統Raft不同,KRaft附加了一個較強的約束:當新的Leader被選舉出來,但還沒有成功提交屬于它的epoch的日志時,不會向前推進HW。也就是說,即使上圖中時刻c的情況發生了,消息2也被視為沒有成功提交,所以按照d1方式處理是安全的。
日志格式說明:
所有節點持久化保存在本地的日志,大概就是類似于這個樣子:
上圖顯示,共有八條日志數據,其中已經提交了7條,提交的日志都將通過狀態機持久化到本地磁盤當中,防止宕機。
日志復制的保證機制
如果兩個節點不同的日志文件當中存儲著相同的索引和任期號,那么他們所存儲的命令是相同的。(原因:leader最多在一個任期里的一個日志索引位置創建一條日志條目,日志條目所在的日志位置從來不會改變)。
如果不同日志中兩個條目有著相同的索引和任期號,那么他們之前的所有條目都是一樣的(原因:每次RPC發送附加日志時,leader會把這條日志前面的日志下標和任期號一起發送給follower,如果follower發現和自己的日志不匹配,那么就拒絕接受這條日志,這個稱之為一致性檢查)
日志的不正常情況
一般情況下,Leader和Followers的日志保持一致,因此Append Entries一致性檢查通常不會失敗。然而,Leader崩潰可能會導致日志不一致:舊的Leader可能沒有完全復制完日志中的所有條目。
下圖闡述了一些Followers可能和新的Leader日志不同的情況。一個Follower可能會丟失掉Leader上的一些條目,也有可能包含一些Leader沒有的條目,也有可能兩者都會發生。丟失的或者多出來的條目可能會持續多個任期。
如何保證日志的正常復制
如果出現了上述leader宕機,導致follower與leader日志不一致的情況,那么就需要進行處理,保證follower上的日志與leader上的日志保持一致,leader通過強制follower復制它的日志來處理不一致的問題,follower與leader不一致的日志會被強制覆蓋。leader為了最大程度的保證日志的一致性,且保證日志最大量,leader會尋找follower與他日志一致的地方,然后覆蓋follower之后的所有日志條目,從而實現日志數據的一致性。
具體的操作就是:leader會從后往前不斷對比,每次Append Entries失敗后嘗試前一個日志條目,直到成功找到每個Follower的日志一致的位置點,然后向該Follower所在位置之后的條目進行覆蓋。
詳細過程如下:
Leader維護了每個Follower節點下一次要接收的日志的索引,即nextIndex。
Leader選舉成功后將所有Follower的nextIndex設置為自己的最后一個日志條目+1。
Leader將數據推送給Follower,如果Follower驗證失?。╪extIndex不匹配),則在下一次推送日志時縮小nextIndex,直到nextIndex驗證通過。
總結一下就是:當leader和follower日志沖突的時候,leader將校驗 follower最后一條日志是否和leader匹配,如果不匹配,將遞減查詢,直到匹配,匹配后,刪除沖突的日志。這樣就實現了主從日志的一致性。
(二)Raft協議算法代碼實現
前面我們已經大致了解了Raft協議算法的實現原理,如果我們要自己實現一個Raft協議的算法,其實就是將我們講到的理論知識給翻譯成為代碼的過程,具體的開發需要考慮的細節比較多,代碼量肯定也比較大,好在有人已經實現了Raft協議的算法了,我們可以直接拿過來使用。
創建maven工程并導入jar包地址如下:
<dependencies>
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.github.wenweihu86.rpc</groupId>
<artifactId>rpc-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.1.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
定義Server端代碼實現:
public class Server1 {
public static void main(String[] args) {
// parse args
// peers, format is "host:port:serverId,host2:port2:serverId2"
//localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1
String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3";
// local server
RaftMessage.Server localServer = parseServer("localhost:16010:1");
String[] splitArray = servers.split(",");
List<RaftMessage.Server> serverList = new ArrayList<>();
for (String serverString : splitArray) {
RaftMessage.Server server = parseServer(serverString);
serverList.add(server);
}
// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 設置Raft選項,比如:
// just for test snapshot
RaftOptions raftOptions = new RaftOptions();
/* raftOptions.setSnapshotMinLogSize(10 * 1024);
raftOptions.setSnapshotPeriodSeconds(30);
raftOptions.setMaxSegmentFileSize(1024 * 1024);*/
// 應用狀態機
ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir());
// 初始化RaftNode
RaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine);
raftNode.getLeaderId();
// 注冊Raft節點之間相互調用的服務
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);
// 注冊給Client調用的Raft服務
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注冊應用自己提供的服務
ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine);
server.registerService(exampleService);
// 啟動RPCServer,初始化Raft節點
server.start();
raftNode.init();
}
private static RaftMessage.Server parseServer(String serverString) {
String[] splitServer = serverString.split(":");
String host = splitServer[];
Integer port = Integer.parseInt(splitServer[1]);
Integer serverId = Integer.parseInt(splitServer[2]);
RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder()
.setHost(host).setPort(port).build();
RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder();
RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build();
return server;
}
}
定義客戶端代碼實現如下:
public class ClientMain {
public static void main(String[] args) {
// parse args
String ipPorts = args[];
String key = args[1];
String value = null;
if (args.length > 2) {
value = args[2];
}
// init rpc client
RPCClient rpcClient = new RPCClient(ipPorts);
ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class);
final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();
// set
if (value != null) {
ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder()
.setKey(key).setValue(value).build();
ExampleMessage.SetResponse setResponse = exampleService.set(setRequest);
try {
System.out.printf("set request, key=%s value=%s response=%s\n",
key, value, printer.print(setResponse));
} catch (Exception ex) {
ex.printStackTrace();
}
} else {
// get
ExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build();
ExampleMessage.GetResponse getResponse = exampleService.get(getRequest);
try {
String value1 = getResponse.getValue();
System.out.println(value1);
System.out.printf("get request, key=%s, response=%s\n",
key, printer.print(getResponse));
} catch (Exception ex) {
ex.printStackTrace();
}
}
rpcClient.stop();
}
}
先啟動服務端,然后啟動客戶端,就可以將實現客戶端向服務端發送消息,并且服務端會向三臺機器進行保存消息了。
五、Kafka常見問題
(一)消息隊列模型知道嗎?Kafka是怎么做到支持這兩種模型的?
對于傳統的消息隊列系統支持兩個模型:
點對點:也就是消息只能被一個消費者消費,消費完后消息刪除。
發布訂閱:相當于廣播模式,消息可以被所有消費者消費。
kafka其實就是通過Consumer Group同時支持了這兩個模型。如果說所有消費者都屬于一個Group,消息只能被同一個Group內的一個消費者消費,那就是點對點模式。如果每個消費者都是一個單獨的Group,那么就是發布訂閱模式。
(二)說說Kafka通信過程原理嗎?
首先kafka broker啟動的時候,會去向Zookeeper注冊自己的ID(創建臨時節點),這個ID可以配置也可以自動生成,同時會去訂閱Zookeeper的brokers/ids路徑,當有新的broker加入或者退出時,可以得到當前所有broker信。
生產者啟動的時候會指定bootstrap.servers,通過指定的broker地址,Kafka就會和這些broker創建TCP連接(通常我們不用配置所有的broker服務器地址,否則kafka會和配置的所有broker都建立TCP連接)
隨便連接到任何一臺broker之后,然后再發送請求獲取元數據信息(包含有哪些主題、主題都有哪些分區、分區有哪些副本,分區的Leader副本等信息)
接著就會創建和所有broker的TCP連接。
之后就是發送消息的過程。
消費者和生產者一樣,也會指定bootstrap.servers屬性,然后選擇一臺broker創建TCP連接,發送請求找到協調者所在的broker。
然后再和協調者broker創建TCP連接,獲取元數據。
根據分區Leader節點所在的broker節點,和這些broker分別創建連接。
最后開始消費消息。
(三)發送消息時如何選擇分區的?
主要有兩種方式:
輪詢,按照順序消息依次發送到不同的分區。
隨機,隨機發送到某個分區。
如果消息指定key,那么會根據消息的key進行hash,然后對partition分區數量取模,決定落在哪個分區上,所以,對于相同key的消息來說,總是會發送到同一個分區上,也是我們常說的消息分區有序性。
很常見的場景就是我們希望下單、支付消息有順序,這樣以訂單ID作為key發送消息就達到了分區有序性的目的。
如果沒有指定key,會執行默認的輪詢負載均衡策略,比如第一條消息落在P0,第二條消息落在P1,然后第三條又在P1。
除此之外,對于一些特定的業務場景和需求,還可以通過實現Partitioner接口,重寫configure和partition方法來達到自定義分區的效果。
(四)為什么需要分區?有什么好處?
這個問題很簡單,如果說不分區的話,我們發消息寫數據都只能保存到一個節點上,這樣的話就算這個服務器節點性能再好最終也支撐不住。
實際上分布式系統都面臨這個問題,要么收到消息之后進行數據切分,要么提前切分,kafka正是選擇了前者,通過分區可以把數據均勻地分布到不同的節點。
分區帶來了負載均衡和橫向擴展的能力。
發送消息時可以根據分區的數量落在不同的Kafka服務器節點上,提升了并發寫消息的性能,消費消息的時候又和消費者綁定了關系,可以從不同節點的不同分區消費消息,提高了讀消息的能力。
另外一個就是分區又引入了副本,冗余的副本保證了Kafka的高可用和高持久性。
(五)詳細說說消費者組和消費者重平衡?
Kafka中的消費者組訂閱topic主題的消息,一般來說消費者的數量最好要和所有主題分區的數量保持一致最好(舉例子用一個主題,實際上當然是可以訂閱多個主題)。
當消費者數量小于分區數量的時候,那么必然會有一個消費者消費多個分區的消息。
而消費者數量超過分區的數量的時候,那么必然會有消費者沒有分區可以消費。
所以,消費者組的好處一方面在上面說到過,可以支持多種消息模型,另外的話根據消費者和分區的消費關系,支撐橫向擴容伸縮。
當我們知道消費者如何消費分區的時候,就顯然會有一個問題出現了,消費者消費的分區是怎么分配的,有先加入的消費者時候怎么辦?
舊版本的重平衡過程主要通過ZK監聽器的方式來觸發,每個消費者客戶端自己去執行分區分配算法。
新版本則是通過協調者來完成,每一次新的消費者加入都會發送請求給協調者去獲取分區的分配,這個分區分配的算法邏輯由協調者來完成。
而重平衡Rebalance就是指的有新消費者加入的情況,比如剛開始我們只有消費者A在消費消息,過了一段時間消費者B和C加入了,這時候分區就需要重新分配,這就是重平衡,也可以叫做再平衡,但是重平衡的過程和我們的GC時候STW很像,會導致整個消費群組停止工作,重平衡期間都無法消息消息。
另外,發生重平衡并不是只有這一種情況,因為消費者和分區總數是存在綁定關系的,上面也說了,消費者數量最好和所有主題的分區總數一樣。
那只要消費者數量、主題數量(比如用的正則訂閱的主題)、分區數量任何一個發生改變,都會觸發重平衡。
下面說說重平衡的過程。
重平衡的機制依賴消費者和協調者之間的心跳來維持,消費者會有一個獨立的線程去定時發送心跳給協調者,這個可以通過參數heartbeat.interval.ms來控制發送心跳的間隔時間。
每個消費者第一次加入組的時候都會向協調者發送JoinGroup請求,第一個發送這個請求的消費者會成為“群主”,協調者會返回組成員列表給群主。
群主執行分區分配策略,然后把分配結果通過SyncGroup請求發送給協調者,協調者收到分區分配結果。
其他組內成員也向協調者發送SyncGroup,協調者把每個消費者的分區分配分別響應給他們。
(六)具體講講分區分配策略?
主要有3種分配策略
Range
對分區進行排序,排序越靠前的分區能夠分配到更多的分區。
比如有3個分區,消費者A排序更靠前,所以能夠分配到P0\P1兩個分區,消費者B就只能分配到一個P2。
如果是4個分區的話,那么他們會剛好都是分配到2個。
但是這個分配策略會有點小問題,他是根據主題進行分配,所以如果消費者組訂閱了多個主題,那就有可能導致分區分配不均衡。
比如下圖中兩個主題的P0\P1都被分配給了A,這樣A有4個分區,而B只有2個,如果這樣的主題數量越多,那么不均衡就越嚴重。
RoundRobin
也就是我們常說的輪詢了,這個就比較簡單了,不畫圖你也能很容易理解。
這個會根據所有的主題進行輪詢分配,不會出現Range那種主題越多可能導致分區分配不均衡的問題。
P0->A,P1->B,P1->A。。。以此類推
Sticky
這個從字面看來意思就是粘性策略,大概是這個意思。主要考慮的是在分配均衡的前提下,讓分區的分配更小的改動。
比如之前P0\P1分配給消費者A,那么下一次盡量還是分配給A。
這樣的好處就是連接可以復用,要消費消息總是要和broker去連接的,如果能夠保持上一次分配的分區的話,那么就不用頻繁的銷毀創建連接了。
(七)如何保證消息可靠性?
生產者發送消息丟失
kafka支持3種方式發送消息,這也是常規的3種方式,發送后不管結果、同步發送、異步發送,基本上所有的消息隊列都是這樣玩的。
發送并忘記,直接調用發送send方法,不管結果,雖然可以開啟自動重試,但是肯定會有消息丟失的可能。
同步發送,同步發送返回Future對象,我們可以知道發送結果,然后進行處理。
異步發送,發送消息,同時指定一個回調函數,根據結果進行相應的處理。
為了保險起見,一般我們都會使用異步發送帶有回調的方式進行發送消息,再設置參數為發送消息失敗不停地重試。
acks=all,這個參數有可以配置0|1|all。
0表示生產者寫入消息不管服務器的響應,可能消息還在網絡緩沖區,服務器根本沒有收到消息,當然會丟失消息。
1表示至少有一個副本收到消息才認為成功,一個副本那肯定就是集群的Leader副本了,但是如果剛好Leader副本所在的節點掛了,Follower沒有同步這條消息,消息仍然丟失了。
配置all的話表示所有ISR都寫入成功才算成功,那除非所有ISR里的副本全掛了,消息才會丟失。
retries=N,設置一個非常大的值,可以讓生產者發送消息失敗后不停重試
Kafka 自身消息丟失。
kafka因為消息寫入是通過PageCache異步寫入磁盤的,因此仍然存在丟失消息的可能。
因此針對kafka自身丟失的可能設置參數:
replication.factor=N,設置一個比較大的值,保證至少有2個或者以上的副本。
min.insync.replicas=N,代表消息如何才能被認為是寫入成功,設置大于1的數,保證至少寫入1個或者以上的副本才算寫入消息成功。
unclean.leader.election.enable=false,這個設置意味著沒有完全同步的分區副本不能成為Leader副本,如果是true的話,那些沒有完全同步Leader的副本成為Leader之后,就會有消息丟失的風險。
消費者消息丟失
消費者丟失的可能就比較簡單,關閉自動提交位移即可,改為業務處理成功手動提交。
因為重平衡發生的時候,消費者會去讀取上一次提交的偏移量,自動提交默認是每5秒一次,這會導致重復消費或者丟失消息。
enable.auto.commit=false,設置為手動提交。
還有一個參數我們可能也需要考慮進去的:
auto.offset.reset=earliest,這個參數代表沒有偏移量可以提交或者broker上不存在偏移量的時候,消費者如何處理。earliest代表從分區的開始位置讀取,可能會重復讀取消息,但是不會丟失,消費方一般我們肯定要自己保證冪等,另外一種latest表示從分區末尾讀取,那就會有概率丟失消息。
綜合這幾個參數設置,我們就能保證消息不會丟失,保證了可靠性。
(八)聊聊副本和它的同步原理吧?
Kafka副本的之前提到過,分為Leader副本和Follower副本,也就是主副本和從副本,和其他的比如Mysql不一樣的是,Kafka中只有Leader副本會對外提供服務,Follower副本只是單純地和Leader保持數據同步,作為數據冗余容災的作用。
在Kafka中我們把所有副本的集合統稱為AR(Assigned Replicas),和Leader副本保持同步的副本集合稱為ISR(InSyncReplicas)。
ISR是一個動態的集合,維持這個集合會通過replica.lag.time.max.ms參數來控制,這個代表落后Leader副本的最長時間,默認值10秒,所以只要Follower副本沒有落后Leader副本超過10秒以上,就可以認為是和Leader同步的(簡單可以認為就是同步時間差)。
另外還有兩個關鍵的概念用于副本之間的同步:
HW(High Watermark):高水位,也叫做復制點,表示副本間同步的位置。如下圖所示,0~4綠色表示已經提交的消息,這些消息已經在副本之間進行同步,消費者可以看見這些消息并且進行消費,4~6黃色的則是表示未提交的消息,可能還沒有在副本間同步,這些消息對于消費者是不可見的。
LEO(Log End Offset):下一條待寫入消息的位移
副本間同步的過程依賴的就是HW和LEO的更新,以他們的值變化來演示副本同步消息的過程,綠色表示Leader副本,黃色表示Follower副本。
首先,生產者不停地向Leader寫入數據,這時候Leader的LEO可能已經達到了10,但是HW依然是0,兩個Follower向Leader請求同步數據,他們的值都是0。
此時,Follower再次向Leader拉取數據,這時候Leader會更新自己的HW值,取Follower中的最小的LEO值來更新。
之后,Leader響應自己的HW給Follower,Follower更新自己的HW值,因為又拉取到了消息,所以再次更新LEO,流程以此類推。
(九)Kafka為什么快?
主要是3個方面:
順序IO
kafka寫消息到分區采用追加的方式,也就是順序寫入磁盤,不是隨機寫入,這個速度比普通的隨機IO快非常多,幾乎可以和網絡IO的速度相媲美。
Page Cache和零拷貝
kafka在寫入消息數據的時候通過mmap內存映射的方式,不是真正立刻寫入磁盤,而是利用操作系統的文件緩存PageCache異步寫入,提高了寫入消息的性能,另外在消費消息的時候又通過sendfile實現了零拷貝。
批量處理和壓縮
Kafka在發送消息的時候不是一條條的發送的,而是會把多條消息合并成一個批次進行處理發送,消費消息也是一個道理,一次拉取一批次的消息進行消費。
并且Producer、Broker、Consumer都使用了優化后的壓縮算法,發送和消息消息使用壓縮節省了網絡傳輸的開銷,Broker存儲使用壓縮則降低了磁盤存儲的空間。
參考資料:
1.《深入理解Kafka:核心設計實踐原理》
2.狀態機程序設計套路
3.raft算法源碼
4.https://www.bbsmax.com/A/QW5Y3kaBzm/
作者簡介

屈志平
騰訊數據研發工程師
騰訊數據研發工程師,目前負責QQ瀏覽器、搜狗小說、搜狗瀏覽器、bingo等多款產品的數據建設。有豐富的數據治理、數倉建設、數據平臺平臺相關的經驗。