導語|Controller作為Apache Kafka的核心組件,本文將從背景、原理以及源碼與監控等方面來深入剖析Kafka Controller,希望帶領大家去了解Controller在整個Kafka集群中的作用。
Controller,是Apache Kafka的核心組件非常重要。它的主要作用是在Apache Zookeeper的幫助下管理和協調控制整個Kafka集群。
在整個Kafka集群中,如果Controller故障異常,有可能會影響到生產和消費。所以,我們需要對其狀態、選舉、日誌等做全面的監控。
Controller,是Apache Kafka的核心組件。它的主要作用是在Apache Zookeeper的幫助下管理和協調控制整個Kafka集群。
集群中的任意一台Broker都能充當Controller的角色,但是,在整個集群運行過程中,只能有一個Broker成為Controller。也就是說,每個正常運行的Kafka集群,在任何時刻都有且只有一個Controller。


其中比較重要的數據有:
所有主題信息。包括具體的分區信息,比如領導者副本是誰,ISR集合中有哪些副本等。
所有Broker信息。包括當前都有哪些運行中的Broker,哪些正在關閉中的Broker等。
所有涉及運維任務的分區。包括當前正在進行Preferred領導者選舉以及分區重分配的分區列表。
這些數據其實在ZooKeeper中也保存了一份。每當控制器初始化時,它都會從ZooKeeper上讀取對應的元數據並填充到自己的緩存中。
而Broker上元數據的更新都是由Controller通知完成的,Broker並不從Zookeeper獲取元數據信息。
Controller職責大致分為5種:
主題管理,分區重分配,Preferred leader選舉,集群成員管理(Broker上下線),數據服務(向其他Broker提供數據服務) 。
它們分別是:
UpdateMetadataRequest:更新元數據請求。topic分區狀態經常會發生變更(比如leader重新選舉了或副本集合變化了等)。由於當前clients只能與分區的leader Broker進行交互,那麼一旦發生變更,controller會將最新的元數據廣播給所有存活的Broker。具體方式就是給所有Broker發送UpdateMetadataRequest請求。
CreateTopics: 創建topic請求。當前不管是通過API方式、腳本方式抑或是CreateTopics請求方式來創建topic,做法幾乎都是在Zookeeper的/brokers/topics下創建znode來觸發創建邏輯,而controller會監聽該path下的變更來執行真正的「創建topic」邏輯。
DeleteTopics:刪除topic請求。和CreateTopics類似,也是通過創建Zookeeper下的/admin/delete_topics/<topic>節點來觸發刪除topic,controller執行真正的邏輯。
分區重分配:即kafka-reassign-partitions腳本做的事情。同樣是與Zookeeper結合使用,腳本寫入/admin/reassign_partitions節點來觸 發,controller負責按照方案分配分區。
Preferred leader分配:preferred leader選舉當前有兩種觸發方式:自動觸發(auto.leader.rebalance.enable=true)和kafka-preferred-replica-election腳本觸發。兩者「玩法」相同,向Zookeeper的/admin/preferred_replica_election寫數據,controller提取數據執行preferred leader分配。
分區擴展:即增加topic分區數。標準做法也是通過kafka-reassign-partitions腳本完成,不過用戶可直接往Zookeeper中寫數據來實現,比如直接把新增分區的副本集合寫入到/brokers/topics/<topic>下,然後controller會為你自動地選出leader並增加分區。
集群擴展:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工作。
broker崩潰:同樣地,controller通過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可立即感知並為受影響分區選舉新的leader。
ControlledShutdown:broker除了崩潰,還能「優雅」地退出。broker一旦自行終止,controller會接收到一個 ControlledShudownRequest請求,然後controller會妥善處理該請求並執行各種收尾工作。
Controller leader選舉:controller必然要提供自己的leader選舉以防這個全局唯一的組件崩潰宕機導致服務中斷。這個功能也是通過 Zookeeper的幫助實現的。
源碼位置可以看後面段落9源碼的說明。
五、Broker如何成為Controller
和解決可能的腦裂問題
(一)Broker如何成為Controller

最先在Zookeeper上創建臨時節點/controller成功的Broker就是Controller。
源碼路徑(Kafka2.2):
Kafka#main->KafkaServerStartable#startup()->KafkaServer#startup()->KafkaController#startup()->eventManager.put(Startup)->elect()-> zkClient.registerControllerAndIncrementControllerEpoch


Controller重度依賴Zookeeper,依賴zookeepr保存元數據,依賴zookeeper進行服務發現。Controller大量使用Watch功能實現對集群的協調管理。
當broker節點因故障離開Kafka集群時,broker中存在的leader分區將不可用(因為客戶端只對leader分區進行讀寫)。為了最大限度地減少停機時間,需要快速找到替代的領導分區。Controller可以從zookeeper watch獲取通知信息。Zookeeper給了客戶端監聽znode變化的能力,也就是所謂的watch通知功能。一旦znode節點創建、刪除、子節點數量發生變化,或者znode中存儲的數據本身發生變化,Zookeeper會通過節點變化處理程序顯式通知客戶端。當Broker宕機或主動關閉時,Broker與Zookeeper的會話結束,znode會被自動刪除。同樣的,Zookeeper的watch機制把這個變化推送給Controller,讓Controller知道有Broker down或者up,這樣Controller就可以進行後續的協調操作。
Controller將收到通知並對其採取行動,以確定Broker上的哪些分區將成為Leader partition。然後,它會通知每個相關的Broker,或者Broker上的topic partition將成為leader partition,或者LeaderAndIsrRequest從新的leader分區複製數據。
(二)如何避免Controller出現裂腦
如果Controller所在的Broker故障,Kafka集群必須有新的Controller,否則集群將無法正常工作。這兒存在一個問題。很難確定Broker是宕機還是只是暫時的故障。但是,為了使集群正常運行,必須選擇新的Controller。如果之前更換的Controller又正常了,不知道自己已經更換了,那麼集群中就會出現兩個Controller。
其實這種情況是很容易發生的。例如,由於垃圾回收(GC),一個Controller被認為是死的,並選擇了一個新的控制器。在GC的情況下,在原Controller眼裡沒有任何變化,Broker甚至不知道自己已經被暫停了。因此,它將繼續充噹噹前Controller,這在分布式系統中很常見,稱為裂腦。

現在,集群中有兩個Controller,可能會一起發出相互衝突的事件,這會導致腦裂。可能會導致嚴重的不一致。所以需要一種方法來區分誰是集群的最新Controller。Kafka是通過使用epoch number來處理,epoch number只是一個單調遞增的數。第一次選擇控制器時,epoch number值為1。如果再次選擇新控制器,epoch number為2,依次單調遞增。
每個新選擇的Controller通過zookeeper的條件遞增操作獲得一個新的更大的epoch number。當其他Broker知道當前的epoch number時,如果他們從Controller收到包含舊(較小)epoch number的消息,則它們將被忽略。即Broker根據最大的epoch number來區分最新的Controller。
epoch number記錄在Zookeepr的一個永久節點controller_epoch。

上圖中,Broker3向Broker1下發命令:將Broker1上的partitionA做為leader,消息的epoch number值為1,同時Broker2也向Broker1發送同樣的命令。不同的是,消息的epoch number值為2,此時broker1隻監聽broker2的命令(由於其epoch號大),而會忽略broker3的命令,以免發生腦裂。
在Kafka2.2之前
網絡處理模型:Kafka Server在啟動時會初始化SocketServer、KafkaApis和KafkaRequestHandlerPool對象,這也是Server網絡處理模型的主要組成部分。Kafka Server的網絡處理模型也是基於Java NIO機制實現的,實現模式與Reactor模式類似。

如上圖,所有請求共享一個requestQueue隊列。
問題:當前Broker對入站請求類型不做任何優先級處理。
不論是PRODUCE請求、FETCH請求還是Controller類的請求。對Controller發送的消息非常不公平,因為這個類請求應該優先級更高。
這就可能造成一個問題:即clients發送的數據類請求積壓導致controller推遲了管理類請求的處理。設想這樣的場景。假設controller向broker廣播了leader發生變更。於是新leader開始接收clients端請求,而同時老leader所在的broker由於出現了數據類請求的積壓使得它一直忙於處理這些請求而無法處理controller發來的LeaderAndIsrRequest請求,因此這是就會出現「雙主」的情況——也就是所謂的腦裂。
在Kafka 2.2
將控制器發送的請求與普通數據類請求分開處理,源碼SocketServer.scala#startup()->KafkaServer.scala。

在0.11版本上也做了大的改進,會在後面段落8中說明。
在整個集群運行過程中,只能有一個Broker成為Controller。所以要監控Controller的數量以及Controller的變更史。
可以用Kafka的JMXTool,進行輕量級的監控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://"${BrokerIP}":"${JMXPort}"/jmxrmi --object-name kafka.controller:type=KafkaController,name=ActiveControllerCount --date-format "YYYY-MM-dd_HH:mm" --reporting-interval -1 | grep -v type記錄Controller變更歷:
function inter_controller_history(){ #第一次檢測集群Controller if [ ! -f "${clusterID}"_controller_history ]; then awk '/,1$/ {print $0}' "${clusterID}"_controller >> "${clusterID}"_controller_history #記錄Controller變更歷史 else nowController=$(awk '/,1$/ {print $0}' "${clusterID}"_controller | awk -F ',' '{print $1}') LastTimeController=$(tail -n 1 "${clusterID}"_controller_history | awk '/,1$/ {print $0}' | awk -F ',' '{print $1}') if [ "${nowController}_X" != "${LastTimeController}_X" ];then awk '/,1$/ {print $0}' "${clusterID}"_controller >> "${clusterID}"_controller_history msg="${msg_tmp} clusterID:${clusterID} ${ClusterNameCN} Controller From ${LastTimeController} to ${nowController}" echo "$msg" >> $log_file_name send_warning fi fi}監控效果:

通過JMXTool,還可以拉取Kafka的其他指標進行監控。
例如:
under_replicated_partitions有非同步副本監控。OfflinePartitionsCount分區丟失leader監控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://${BrokerIP}:"${JMXPort}"/jmxrmi --object-name kafka.controller:type=KafkaController,name=OfflinePartitionsCount --date-format "YYYY-MM-dd_HH:mm" --reporting-interval -1ZooKeeper_SessionState Broker與Zookeeper斷開連接監控。
MessagesInPerSec,進入Broker消費數量監控。
${KAFKA_PATH}/bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://"${BrokerIP}":"${JMXPort}"/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --date-format "YYYY-MM-dd HH:mm" --attributes Count --reporting-interval -1ISR擴縮容率等。
監控可以有很多方式,這樣做主要是簡單方便,不需要依賴太多監控系統,同時監控程序可以快速部署到海外或者合作夥伴機房。
Kafka中的一台Broker充當Controller的角色,此台Broker不僅對生產者消費者提供服務,還要協調整個集群的管理工作。如果使用0.11版本之前的Kafka而且分區很多時,建議將幾台機器配置為只能成為Controller(當然這裡需要修改源碼,編譯)。
0.11版本之前
同步操作Zookeeper使用同步的API,性能差。當Broker宕機,大量主題分區發生變更時,自動恢復時間長。Controller是一個分區一個分區進行寫入的,對於分區數很多的集群來說,這無疑是個巨大的性能瓶頸。
0.11 版本
異步操作Zookeeper使用async API,寫入提升了10倍。

如果機器性能較好,可以將Zookeeper和Controller部署在相同的機器。Kafka對Zookeeper寫請求比較少。
注意:消費方式有基於Zookeeper消費和基於Broker消息。基於Zookeeper消費,就是將消費位移提交到Zookeeper上,這種方式對Zookeeper有大量寫操作。不要將Zookeeper和其他機器共用。Zookeeper官網上有對讀寫占比的壓測說明:

源碼(基於kafka 2.2)的內容較多:


(一)Controller啟動流程【主要看寫的源碼注釋】/** * KafkaController#startup中為每一個server都會啟動一個eventManager * * 集群首次啟動時,Controller 尚未被選舉出來。 * 於是,Broker 啟動後,首先將 Startup 這 個 ControllerEvent 寫入到事件隊列中,然後啟動對應的事件處理線程和 ControllerChangeHandler ZooKeeper 監聽器, * 最後依賴事件處理線程進行 Controller 的選舉。 * 在源碼中,KafkaController 類的 startup 方法就是做這些事情的。 * 當Broker 啟動時,它 會調用這個方法啟動 ControllerEventThread 線程。值得注意的是,每個 Broker 都需要 做這些事情, * 不是說只有 Controller 所在的 Broker 才需要執行這些邏輯。 * * 首先,startup 方法會註冊 ZooKeeper 狀態變更監聽器,用於監聽 Broker 與 ZooKeeper 之間的會話是否過期。 * 接着,寫入 Startup 事件到事件隊列,然後啟動 ControllerEventThread 線程,開始處理事件隊列中的 Startup 事件。 */def startup() = { /** * controller組件啟動監聽器的方式是在zk上面註冊一個stateChangeHandler * 在KafkaController.startup()方法中首先通過zk註冊監聽事件,監聽StateChangeHandler * registerStateChangeHandler用於session過期後觸發重新選舉 * * 第1步:註冊Zookeeper狀態變更監聽器,它是用於監聽Zookeeper和broker會話過期的, */ zkClient.registerStateChangeHandler(new StateChangeHandler { override val name: String = StateChangeHandlers.ControllerHandler override def afterInitializingSession(): Unit = { eventManager.put(RegisterBrokerAndReelect) } override def beforeInitializingSession(): Unit = { val expireEvent = new Expire eventManager.clearAndPut(expireEvent) // Block initialization of the new session until the expiration event is being handled, // which ensures that all pending events have been processed before creating the new session /** * 阻塞等待時間被處理結束,session過期觸發重新選舉,必須等待選舉這個時間完成Controller才能正常工作 */ expireEvent.waitUntilProcessingStarted() } }) /** Startup是一個ControllerEvent,ControllerEventThread會執行它的process方法 * Startup 類型的 ControllerEvent 被放入到 eventmanager中,被 KafkaController#process 方法調用,在下面有定義 * 在Startup的回調方法process()中,首先在zk中監聽/controller路徑。並且調用elect()進行選舉過程。 * * case object Startup extends ControllerEvent { * def state = ControllerState.ControllerChange * override def process(): Unit = { * zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) * // elect就是嘗試競選controller * elect() * } * } * * 調用elect(),進行選舉,在onControllerFailover()中 * 放入Startup並啟動eventManager後台線程開始選舉, Startup 是個事件 * * put 方法 在 core/src/main/scala/kafka/controller/ControllerEventManager.scala#def put(event: ControllerEvent): Unit = inLock(putLock) { * 第2步:寫入Startup事件到事件隊列 */ eventManager.put(Startup) /** 啟動了ControllerEventManager * KafkaController#startup 中為每一個 server 都會啟動一個 eventManage * 啟動eventManager來處理event queue中的任務 * * ControllerEventThread在 * core/src/main/scala/kafka/controller/ControllerEventManager.scala中定義 * * 第3步:內部啟動ControllerEventThread線程,開始處理事件隊列中的ControllerEvent */ eventManager.start()}(二)Controller選舉流程【主要看寫的源碼注釋】 /** * elect方法是關於Controller選舉的核心方法 * elect就是嘗試競選controller,如果我們當前節點真的被選為controller(onControllerFailover()–故障轉移) * */private def elect(): Unit = { /** 獲取zk /controller節點中的brokerId,沒有返回-1 * 第1步:獲取當前Controller所在Broker的序列號,如果Controller不存在,顯式標記為-1 */ activeControllerId = zkClient.getControllerId.getOrElse(-1) /** * 開始創建臨時節點前檢查,如果/controller節點已經存在,說明已經有broker成為controller, * 因此本broker直接退出controller選舉 * 第2步:如果當前Controller已經選出來了,直接返回即可 */ if (activeControllerId != -1) { /** 在初始化的時候可能會走到這裡,如果當前controller不空,則退出選舉 * [2020-11-06 13:27:50,200] INFO [ControllerEventThread controllerId=19] Starting (kafka.controller.ControllerEventManager$ControllerEventThread) * [2020-11-06 13:27:50,208] DEBUG [Controller id=19(當前機器的Broker ID)] Broker 13 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) */ debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.") return } try { /** * //所謂選舉,就是搶占zk上面一個節點,如果拋異常說明未能選舉上 * registerControllerAndIncrementControllerEpoch在zk/KafkaZkClient.scala * 創建臨時節點,聲明本broker成為controller * 主要是創建/controller節點 * 嘗試去創建/controller節點,如果創建失敗了(已存在),會在catch里處理 * * registerControllerAndIncrementControllerEpoch 方法在 zk/KafkaZkClient.scala */ val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId) controllerContext.epoch = epoch controllerContext.epochZkVersion = epochZkVersion /** ControllerId就是當前主機的 brokerId */ activeControllerId = config.brokerId /** 未拋異常說明寫入創建成功,本broker榮升為controller */ info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " + s"and epoch zk version is now ${controllerContext.epochZkVersion}") /** 成為Controller後,主要做的工作 * 第4步:執行當選Controller的後續邏輯 */ onControllerFailover() } catch { case e: ControllerMovedException => maybeResign() /** 如果/controller已存在, brokerid就不會是-1 * {"version":1,"brokerid":0,"timestamp":"1582610063256"} */ if (activeControllerId != -1) debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e) else /** 上一屆controller剛下台,節點還沒刪除的情況 */ warn("A controller has been elected but just resigned, this will result in another round of election", e) case t: Throwable => error(s"Error while electing or becoming controller on broker ${config.brokerId}. " + s"Trigger controller movement immediately", t) /** 遇到不可知錯誤,取消zk相關節點的監聽註冊,並調用刪除/controller的zk的node */ triggerControllerMove() }}(三)成為Controller後的初始化工作【主要看寫的源碼注釋】/** * 真正複雜的是broker在成為Controller之後,在onControllerFailover方法中進行的一系列初始化動作 * * .啟動controller的channel manager用於接收請求 * 3.啟動replica的狀態機,監測replica是OnlineReplica還是OfflineReplica的狀態。這裡的offline是指該replica的broker已經掛掉。 * 4.啟動partition的狀態機,監測partition是OnlinePartition還是OfflinePartition。這裡的offline是指該partion的leader已經掛掉。 * 5.啟動自動的leader分配rebalance(如果啟動設置) */ private def onControllerFailover() { info("Registering handlers") // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks /** * 在讀取zk之前註冊brokerchange, topicchange, topicdeletion, logDirEventNoti,isrChange事件, * 從這行及下面事件註冊也看出control在集群中的作用 * 註冊一組childrenChangeHandler,在NodeChildrenChange事件觸發後,會分發給這些handler * handler 監聽的zk節點 事件 ControllerEvent 功能*--------------------------------------------------*brokerChangeHandler /brokers/ids childChange BrokerChange*topicChangeHandler /brokers/topics childChange TopicChange*topicDeletionHandler /admin/delete_topics childChange TopicDeletion*logDirEventNotificationHandler /log_dir_event_notification childChange LogDirEventNotification*isrChangeNotificationHandler /isr_change_notification childChange IsrChangeNotification*partitionReassignmentHandler /admin/reassign_partitions create PartitionReassignment 執行副本重分配*preferredReplicaElectionHandler /admin/preferred_replica_election create PreferredReplicaLeaderElection Preferred leader副本選舉 * * 其中brokerChangeHandler(new BrokerChangeHandler(this,eventManager)為Broker數據量zookeeper監聽器 ) */ /** 註冊各種監聽器 */ val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler) /** 註冊對broker數據量監聽器 */ childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) /** * 註冊/admin/preferred_replica_election, /admin/reassign_partitions節點事件處理 * 也是註冊,不過要檢查節點是否存在(這裡不對是否存在做處理,只是保證沒有異常) */ val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) info("Deleting log dir event notifications") /** * 刪除節點:/log_dir_event_notification/log_dir_event_xxx,/isr_change_notification/isr_change_xxx節點 */ /** * 刪除log_dir_event_notification這個目錄下面的子節點 * 清理已存在的LogDirEventNotifications在zk上的記錄 */ zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion) info("Deleting isr change notifications") /** * 刪除isr_change_notification這個目錄下面的子節點 * 清理已存在的IsrChangeNotifications在zk上的記錄 */ zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion) info("Initializing controller context") /** * 初始化controller的上下文 * 在initializeControllerContext()中: * 1. 註冊監聽所有topic的partitionModification事件 * 2. 從zk中獲取所有topic的副本分配信息 * 3. 在zk中監聽所有broker的更新情況 * 4. 從zk中讀取topicPartition的leadership,更新本地緩存 * 5. 初始化ControllerChannelManager,為每個broker生成一個後台通信線程用於和broker通信,並啟動後台線程 * 6. 從zk的/admin/reassign_partitions路徑下讀取partition的reassigned信息放入緩存用於後續處理 * * 初始化 controller 相關的變量信息:包括 alive broker 列表、partition 的詳細信息等 */ /** 初始化集群元數據,元數據對象ControllerContext */ initializeControllerContext() /** 初始化 controller 相關的變量信息 */ info("Fetching topic deletions in progress") /** * 獲取所有待刪除的topic * 要刪除的topics和刪除失敗的topics */ val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() info("Initializing topic deletion manager") /** * 初始化ControllerContext之後,接下來是topicDeletionManager——topic刪除管理器的初始化 * 註:topic刪除只會在delete.topic.enable為true時才能進行,而且分階段進行刪除 * 初始化通過topicDeletionManager,如果isDeleteTopicEnabled則在zk中直接刪除topicsToBeDeleted * * 初始化topic刪除管理器 */ topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) info("Sending update metadata request") /** * 同步一下live的broker列表 * controller context 初始化結束之後發送請求更新metadata,這是因為需要在brokers能處理LeaderAndIsrRequests前獲取哪些brokers是live的, * * 在 controller contest 初始化之後,我們需要發送 UpdateMetadata 請求在狀態機啟動之前,這是因為 broker 需要從 UpdateMetadata 請求 * 獲取當前存活的 broker list, 因為它們需要處理來自副本狀態機或分區狀態機啟動發送的 LeaderAndIsr 請求 * * 在處理LeaderAndIsrRequest請求之前,先更新所有broker以及所有partition的元數據 * 是為後面的副本狀態機,分區狀態機的啟動做準備,將元數據同步給其它broker,讓它們可以處理LeaderAndIsrRequest請求 */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) /** * 啟動副本狀態機,初始化zk中所有副本的狀態 * 如果是online的副本則標記為OnlineReplica狀態,否則標記為ReplicaDeletionIneligible * 生成LeaderAndIsrRequest請求並發送到對應brokerId * * 初始化 replica 的狀態信息: replica 是存活狀態時是 OnlineReplica, 否則是 ReplicaDeletionIneligible */ replicaStateMachine.startup() /** * 啟動分區狀態機,初始化分區狀態至OnlinePartition,對於那些NewPartition,OfflinePartition狀態的分區進行選舉並在zk中更新updateLeaderAndIsr * * 初始化 partition 的狀態信息:如果 leader 所在 broker 是 alive 的,那麼狀態為 OnlinePartition,否則為 OfflinePartition * 並狀態為 OfflinePartition 的 topic 選舉 leader */ partitionStateMachine.startup() info(s"Ready to serve as the new controller with epoch $epoch") /** * 判斷是否需要重新分配partition * 檢查是否topic的partition的副本需要重新分配(reassign), * 如果partitionsBeingReassigned緩存中的分配信息和controllerContext緩存中不一致,則需要觸發重新分配 * * 分區副本重分配的方法入口是maybeTriggerPartitionReassignment方法, * 該方法會在Controller初始化和PartitionReassignment事件處理器中調用 */ maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet) topicDeletionManager.tryTopicDeletion() /** Preferred leader副本選舉 * 首先是通過fetchPendingPreferredReplicaElections獲取要進行Preferred leader副本選舉的分區 * */ val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() /** * onPreferredReplicaElection方法通過分區狀態機,將分區轉換為OnlinePartition狀態, * 並根據PreferredReplicaPartitionLeaderElectionStrategy選舉leader, */ onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered) info("Starting the controller scheduler") /** 定時任務 */ kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } if (config.tokenAuthEnabled) { info("starting the token expiry check scheduler") tokenCleanScheduler.startup() tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = () => tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } }(四)從KafkaController類看Controller的主要工作【主要看寫的源碼注釋】/** config: Kafka配置信息 * zkClient:Zk客戶端,Controller與zookeeper交互使用該屬性 * Time, 時間戳工具類 * initialBrokerInfo: Broker 節點信息,hostname,port等 * initialBrokerEpoch: Controller所在Broker的Epoch值 *//** * KafkaController#startup之前,需要說明下KafkaController中有很多成員變量,主要分為 * zk事件處理器(ZNodeChangeHandler,ZNodeChildChangeHandler) * StateMachine(有限狀態機): 副本的狀態機,分區的狀態機,主要負責狀態的維護及轉換時的處理 * ControllerContext:broker,topic,partition,replica相關的數據緩存 * ControllerEventManager: zk事件管理器, */class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = s"[Controller id=${config.brokerId}] " @volatile private var brokerInfo = initialBrokerInfo @volatile private var _brokerEpoch = initialBrokerEpoch private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) /** * 實例化上下文 * broker,topic,partition,replica相關的數據緩存 * 維護上下文信息,緩存 ZK 中記錄的整個集群的元數據信息 */ val controllerContext = new ControllerContext // have a separate scheduler for the controller to be able to start and stop independently of the kafka server // visible for testing private[controller] val kafkaScheduler = new KafkaScheduler(1) // visible for testing , /** ControllerEventManager: zk事件管理器 * ControllerEventManager 在core/src/main/scala/kafka/controller/ControllerEventManager.scala * 初始化eventManager,對ControllerEvent事件進行管理 * * KafkaController重要屬性 * activeControllerId:獲取controller的主broker的id * eventManager: ControllerEventManager實例,負責處理事件 * * */ private[controller] val eventManager = new ControllerEventManager(config.brokerId, controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign()) /** topic 刪除管理*/ val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient) /** Controller 給 其他broker 發送批量請求 , 這裡有一個比較重要的對象,就是 ControllerBrokerRequestBatch 對象, * 在ControllerChannelManager.scala中定義 * */ private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger) /** StateMachine(有限狀態機): 副本的狀態機,分區的狀態機,主要負責狀態的維護及轉換時的處理 */ /** 管理集群中所有副本狀態的狀態機 */ val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) /** 管理集群中所有分區狀態的狀態機 */ff val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) partitionStateMachine.setTopicDeletionManager(topicDeletionManager) /** Controller 控制的 事件 */ /** Controller 節點 zookeeper 監聽器*/ private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager) /** Broker zookeeper 監聽器*/ private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager) /** Broker 節點信息變更 zookeeper 監聽器 */ private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty /** 主題數量 zookeeper 監聽器 */ private val topicChangeHandler = new TopicChangeHandler(this, eventManager) /** 主題刪除 zookeeper 監聽器 */ private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager) /** 主題分區變更 zookeeper 監聽器 */ private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty /** 主題分區重分配 zookeeper 監聽器 * 分區副本重分配主要由/admin/reassign_partitions節點的create事件觸發,該事件的處理器為partitionReassignmentHandler, * */ private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager) /** Preferred Leader 選舉zookeeper 監聽器*/ private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager) /** ISR副本集群變更 zookeeper 監聽器 */ private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager) /** 日誌路徑變更 zookeeper 監聽器*/ private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)(五)其他源碼部分
Controller還有幾個重要部分的源碼:
Controller 發送模型NetWork
ControllerChannelManager
Controller-Partition狀態機
Controller-Replica狀態機
Controller-分區副本重分配(PartitionReassignment)與Preferred leader副本選舉
Controller-Broker的上線與下線
Controller-LeaderAndIsr請求
Topic 的新建/擴容/刪除
由於代碼和注釋比較多,在此略過。
參考資料:1.Kafka運維填坑
2.Matt's Blog
3.What is Kafka’s controller broker
4.ZooKeeper:A Distributed Coordination Service for Distributed Applications
騰訊運營規劃工程師,目前負責騰訊遊戲萬億級實時數倉、BG數據中台的運營工作。有豐富的消息中間件,分布式大數據處理引擎的運營管理經驗。