背景
目前,公司方面 RPC 調用如 Dubbo、Feign 已經能支持基於灰度的調用,但是 MQ 還沒有支持灰度的能力,因此導致在測試和生產環境業務驗證、消息隔離方面體驗比較差,因此我們基於 RabbitMQ 和 Kafka 實現了消息灰度的能力。
灰度場景
大部分場景下 MQ 的灰度並不會像 RPC 那樣那麼嚴格,但是我們需要確認消費場景,即當灰度消費者不存在的情況下,消息是否應該由正常消費者去消費。

事實的情況是可能大家都想要這種嚴格意義上的消息灰度隔離策略,由此才證明是真正的消息灰度方案,但是這個方案需要考慮一些具體場景問題。
比如,有時候作為灰度節點的發送方,它的功能改動點並不是在 MQ 這塊,但是它發送的消息卻是灰度消息,而消息的消費方可能也未發生過功能變動,也不會有與之對應的灰度消費標識,這種情況下如果我們將灰度的消息進行丟棄的話,那麼會造成最終的數據不完整。
因此,我們再考慮第二種方案,如果當灰度消費節點不存在時,消息會由正常節點消費,當存在灰度節點時,則由灰度節點消費,正常節點消費灰度消息只為了當灰度節點不存在時的兜底。
那麼,這種場景仍然可能存在問題,比如當消費節點的消費邏輯發生改變時,由正常節點消費就可能造成業務上的錯誤。對於此問題我們可以默認認為如果消費方發生邏輯改變,則灰度節點大概率一定是存在的,如果一些異常情況導致的異常或者宕機的場景,仍然能通過人工或者告警判斷出來,總之,這個問題認為不算是問題。
灰度方案
我們分別從 MQ 的自身特性和一些通用的處理方式出發,分別探討 RabbitMQ 和 Kafka 的灰度實現方式。
常規方案:影子Queue/Topic
這個是現在實現 MQ 灰度最為常見的方案,為每一個Queue/Topic都建立一個與之對應的灰度Queue/Topic。
生產者層將要發送的消息進行Queue/Topic/RoutingKey的動態修改,讓他發送到灰度或正常的Queue/Topic中。
而消費者層面只需要在應用啟動時根據自身的灰度標記動態的切換到灰度Queue/Topic進行監聽即可。
但是對於我們目前的系統現狀而言,這個方案存在三個問題:

首先,由於我們目前系統測試環境的灰度標籤是可以定製的,可能每一個功能上線都會有一個對應的灰度標識,這樣帶來的問題就是Queue/Topic的數量會隨着灰度標識的增加而倍數性的增加。
而不管哪種MQ,過多的Queue/Topic都會對 MQ 本身造成一定處理能力下降。
另外,我們的灰度標籤是可以根據啟動的實例隨意修改的,也就意味着對應的整套Queue/Topic也得跟着灰度的標識隨意的創建。這樣一來,人工手動跟着創建顯然就不太現實,而生產環境中我們的Queue/Topic創建是需要走流程申請的,這又和我們的現狀違背。
再者,即便我們能夠根據生產者的灰度標識動態的創建Queue/Topic的話,那麼至少也需要考慮在灰度生產者實例正常下線時將它創建的Queue/Topic進行銷毀,如果異常的下線還需要人工的接入定期的進行Queue/Topic的清理工作。
最後,如果是針對 Kafka 或 RocketMQ,這種方案實行起來還比較簡單,如果是對於RabbitMQ,這裡又多了一層 Exchange 和 Queue 的綁定關係,不同的生產模式也需要去做各自的適配。
所以,為了在 RabbitMQ 和 Kafka 之間的一致性,我們決定不採用該方案來實現。
RabbitMQ
對於 RabbitMq,我們使用重新入隊這個特性來實現灰度隊列。
通過重新入隊的這個特性,我們可以在生產者發送消息時將灰度的標識標記到消息頭,發送時一併發出。
當消費者消費消息時,根據消費者自身標記決定要不要對消息進行消費,如果消費者本身不滿足灰度消費規則,則把這條灰度消息進行Requeue處理。
這條消息經過輪詢,最終會流轉到灰度標識的消費者進行消費。

Requeue實現思路
生產者在啟動時,我們通過自動裝配,註冊 RabbitTemplate 時setBeforePublishPostProcessors添加前置處理器,在發送消息前對消息的 Header 添加灰度標記。


消費流程
首先,在消費時通過監聽SimpleMessageListenerContainer重寫executeListener方法進行消息處理。


Kafka
在 Kafka 的消費理念中有一層消費者組的概念,每個消費者都有一個對應的消費組。
當消息發布到主題後,只會被投遞給訂閱它的每個消費組中的一個消費者,兩個消費組之間互不影響。

藉助這個消費特性,可以將同一個消費組中的灰度消費者單獨拎出來,做成一個特殊的消費組,這樣每個消費組都會接收到同樣的消息。
在正常的消費組中,遇到帶有灰度標識的消息,我們只做空消費,不實際執行業務邏輯,在灰度消費組中的消費者,只處理匹配到灰度規則的消息,其它的消息做空消費。
實現思路 生產者生產灰度消息的時候在消息 Header 裡面添加灰度標記 灰度消費者和正常消費者設置不同的GroupId 灰度消費者和正常消費者在拿到消息後判斷有沒有灰度標記,判斷配置中心是否開啟了消息灰度,如果開啟了則進行灰度節點的消費,如果沒開啟則不消費
生產者在啟動的時候會去動態裝配所有的攔截器,裝配的方式為在 BeanPostProcessor 的後置處理器中獲取到 KafkaTemplate 對象,把我們的攔截器的類的全限定名 set 進去 config 即可,這裡可以支持不管用戶自己創建的 Factory對象還是 KafkaTemplate 對象都能進行攔截器的裝配。


消費流程
消費的時候也是一樣,如果當前節點是灰度節點,那麼就修改當前group.id為灰度,最後通過攔截器執行消費邏輯。


- EOF -
假如我是核酸系統架構師,我會...
美團點評數據庫高可用架構的演進與設想
單體架構服務轉型至分布式的踩坑經歷
看完本文有收穫?請轉發分享給更多人
關注「ImportNew」,提升Java技能
點讚和在看就是最大的支持❤️