一、概述
Apache Kafka 發展至今,已經是一個很成熟的消息隊列組件了,也是大數據生態圈中不可或缺的一員。Apache Kafka 社區非常的活躍,通過社區成員不斷的貢獻代碼和迭代項目,使得 Apache Kafka 功能越發豐富、性能越發穩定,成為企業大數據技術架構解決方案中重要的一環。
Apache Kafka 作為一個熱門消息隊列中間件,具備高效可靠的消息處理能力,且擁有非常廣泛的應用領域。那麼,今天就來聊一聊基於 Kafka 的實時數倉在搜索的實踐應用。
二、為什麼需要 Kafka
在設計大數據技術架構之前,通常會做一些技術調研。我們會去思考一下為什麼需要 Kafka?怎麼判斷選擇的 Kafka 技術能否滿足當前的技術要求?
2.1 早期的數據架構
早期的數據類型比較簡單,業務架構也比較簡單,就是將需要的數據存儲下來。比如將遊戲類的數據存儲到數據庫(MySQL、Oracle)。但是,隨着業務的增量,存儲的數據類型也隨之增加了,然後我們需要使用的大數據集群,利用數據倉庫來將這些數據進行分類存儲,如下圖所示:
但是,數據倉庫存儲數據是有時延的,通常時延為T+1。而現在的數據服務對象對時延要求均有很高的要求,例如物聯網、微服務、移動端APP等等,皆需要實時處理這些數據。
2.2Kafka 的出現
Kafka 的出現,給日益增長的複雜業務,提供了新的存儲方案。將各種複雜的業務數據統一存儲到 Kafka 裡面,然後在通過 Kafka 做數據分流。如下圖所示:
這裡,可以將視頻、遊戲、音樂等不同類型的數據統一存儲到 Kafka 裡面,然後在通過流處理對 Kafka 裡面的數據做分流操作。例如,將數據存儲到數據倉庫、將計算的結果存儲到KV做實時分析等。
通常消息系統常見的有兩種,它們分別是:
消息隊列:隊列消費者充當了工作組的角色,每條消息記錄只能傳遞給一個工作進程,從而有效的劃分工作流程;
生產&消費:消費者通常是互相獨立的,每個消費者都可以獲得每條消息的副本。
這兩種方式都是有效和實用的,通過消息隊列將工作內容分開,用於容錯和擴展;生產和消費能夠允許多租戶,來使得系統解耦。而 Apache Kafka 的優點之一在於它將消息隊列、生產和消費結合到了一個強大的消息系統當中。
同時,Kafka 擁有正確的消息處理特性,主要體現在以下幾個方面:
可擴展性:當 Kafka 的性能(如存儲、吞吐等)達到瓶頸時,可以通過水平擴展來提升性能;
真實存儲:Kafka 的數據是實時落地在磁盤上的,不會因為集群重啟或故障而丟失數據;
實時處理:能夠集成主流的計算引擎(如Flink、Spark等),對數據進行實時處理;
順序寫入:磁盤順序 I/O 讀寫,跳過磁頭「尋址」時間,提高讀寫速度;
內存映射:操作系統分頁存儲利用內存提升 I/O 性能,實現文件到內存的映射,通過同步或者異步來控制 Flush;
零拷貝:將磁盤文件的數據複製到「頁面緩存」一次,然後將數據從「頁面緩存」直接發送到網絡;
高效存儲:Topic 和 Partition 拆為多個文件片段(Segment),定期清理無效文件。採用稀疏存儲,間隔若干字節建立一條索引,防止索引文件過大。
2.3 簡單的應用場景
這裡,我們可以通過一個簡單直觀的應用場景,來了解 Kafka 的用途。
場景:假如用戶A正在玩一款遊戲,某一天用戶A喜歡上了遊戲裡面的一款道具,打算購買,於是在當天 14:00 時充值了 10 元,在逛遊戲商店時又喜歡上了另一款道具,於是在 14:30 時又充值了 30 元,接着在 15:00 時開始下單購買,花費了 20 元,剩餘金額為 20 元。那麼,整個事件流,對應到庫表裡面的數據明細應該是如下圖所示:
三、Kafka解決了什麼問題
早期為響應項目快速上線,在服務器或者雲服務器上部署一個 WebServer,為個人電腦或者移動用戶提供訪問體驗,然後後台在對接一個數據庫,為 Web 應用提供數據持久化以及數據查詢,流程如下圖所示:
但是,隨着用戶的迅速增長,用戶所有的訪問都直接通過 SQL 數據庫使得它不堪重負,數據庫的壓力也越來越大,不得不加上緩存服務以降低 SQL 數據庫的荷載。
同時,為了理解用戶行為,又開始收集日誌並保存到 Hadoop 這樣的大數據集群上做離線處理,並且把日誌放在全文檢索系統(比如 ElasticSearch)中以便快速定位問題。由於需要給投資方看業務狀況,也需要把數據匯總到數據倉庫(比如 Hive)中以便提供交互式報表。此時的系統架構已經具有一定的複雜性了,將來可能還會加入實時模塊以及外部數據交互。
本質上,這是一個數據集成問題。沒有任何一個系統能夠解決所有的事情,所以業務數據根據不同用途,存放在不同的系統,比如歸檔、分析、搜索、緩存等。數據冗餘本身沒有任何問題,但是不同系統之間太過複雜的數據同步卻是一種挑戰。如下圖所示:
而 Kafka 可以讓合適的數據以合適的形式出現在合適的地方。Kafka 的做法是提供消息隊列,讓生產者向隊列的末尾添加數據,讓多個消費者從隊列裡面依次讀取數據然後自行處理。如果說之前連接的複雜度是 O(N^2),那麼現在複雜度降低到了 O(N),擴展起來也方便多了,流程如下圖所示:
四、Kafka的實踐應用
4.1 為什麼需要建設實時數倉
4.1.1 目的
通常情況下,在大數據場景中,存儲海量數據建設數據倉庫一般都是離線數倉(時延T+1),通過定時任務每天拉取增量數據,然後創建各個業務不同維度的數據,對外提供 T+1 的數據服務。計算和數據的實時性均比較差,業務人員無法根據自己的即時性需求獲取幾分鐘之前的實時數據。數據本身的價值隨着時間的流逝會逐步減弱,因此數據產生後必須儘快的到達用戶的手中,實時數倉的建設需求由此而來。
4.1.2 目標
為了適應業務高速迭代的特點,分析用戶行為,挖掘用戶價值,提高用戶留存,在實時數據可用性、可擴展性、易用性、以及準確性等方面提供更好的支持,因此需要建設實時數倉。主要目標包含如下所示:
統一收斂數據出口:統一數據口徑,減少數據重複性建設;
降低數據維護成本:提升數據準確性、及時性,優化數據使用體驗和成本;
減少數據使用成本:提高數據復用率,避免實時數據重複消費。
4.2 如何構建實時數倉為搜索提供數據
當前實時數倉比較主流的架構一般來說包含三個大的模塊,它們分別是消息隊列、計算引擎、以及存儲。結合上述對 Kafka 的綜合分析,結合搜索的業務場景,引入 Kafka 作為消息隊列,復用大數據平台(BDSP)的能力作為計算引擎和存儲,具體架構如下圖所示:
4.3 流處理引擎選擇
目前業界比較通用的流處理引擎主要有兩種,它們分別是Flink和Spark,那麼如何選擇流處理引擎呢?我們可以對比以下特徵來決定選擇哪一種流處理引擎?
Flink作為一款開源的大數據流式計算引擎,它同時支持流批一體,引入Flink作為實時數倉建設的流引擎的主要原因如下:
高吞吐、低延時;
靈活的流窗口;
輕量級容錯機制;
流批一體
4.4 建設實時數倉遇到的問題
在建設初期,用於實時處理的 Kafka 集群規模較小,單個 Topic 的數據容量非常大,不同的實時任務都會消費同一個大數據量的 Topic,這樣會導致 Kafka 集群的 I/O 壓力非常的大。
因此,在使用的過程中會發現 Kafka 的壓力非常大,經常出現延時、I/O能性能告警。因此,我們採取了將大數據量的單 Topic 進行實時分發來解決這種問題,基於 Flink 設計了如下圖所示的數據分發流程。
上述流程,隨着業務類型和數據量的增加,又會面臨新的問題:
數據量增加,隨着消費任務的增加,Kafka 集群 I/O 負載大時會影響消費;
不用業務之間 Topic 的消費沒有落地存儲(比如HDFS、HBase存儲等),會產生重複消費的情況;
數據耦合度過高,遷移數據和任務難度大。
4.5 實時數倉方案進階
目前,主流的實時數倉架構通常有2種,它們分別是Lambda、Kappa。
4.5.1 Lambda
隨着實時性需求的提出,為了快速計算一些實時指標(比如,實時點擊、曝光等),會在離線數倉大數據架構的基礎上增加一個實時計算的鏈路,並對消息隊列實現數據來源的流失處理,通過消費消息隊列中的數據 ,用流計算引擎來實現指標的增量計算,並推送到下游的數據服務中去,由下游數據服務層完成離線和實時結果的匯總。具體流程如下:
4.5.2 Kappa
Kappa架構只關心流式計算,數據以流的方式寫入到 Kafka ,然後通過 Flink 這類實時計算引擎將計算結果存放到數據服務層以供查詢。可以看作是在Lambda架構的基礎上簡化了離線數倉的部分。具體流程如下:
在實際建設實時數倉的過程中,我們結合這2種架構的思想來使用。實時數倉引入了類似於離線數倉的分層理念,主要是為了提供模型的復用率,同時也要考慮易用性、一致性、以及計算的成本。
4.5.3 實時數倉分層
在進階建設實時數倉時,分層架構的設計並不會像離線數倉那邊複雜,這是為了避免數據計算鏈路過長造成不必要的延時情況。具體流程圖如下所示:
ODS層:以Kafka 作為消息隊列,將所有需要實時計算處理的數據放到對應的 Topic 進行處理;
DW層:通過Flink實時消費Topic中的數據,然後通過數據清理、多維度關聯(JOIN)等,將一些相同維度的業務系統、維表中的特徵屬性進行關聯,提供數據易用性和復用性能力,最終得到實時明細數據;
DIM層:用來存儲關聯的查詢的維度信息,存儲介質可以按需選擇,比如HBase、Redis、MySQL等;
DA層:針對實時數據場景需求,進行高度聚合匯總,服務於KV、BI等場景。OLAP分析可以使用ClickHouse,KV可以選擇HBase(若數據量較小,可以採用Redis)。
通過上面的流程,建設實時數倉分層時,確保了對實時計算要求比較高的任務不會影響到BI報表、或者KV查詢。但是,會有新的問題需要解決:
Kafka 實時數據如何點查?
消費任務異常時如何分析?
4.5.4 Kafka監控
針對這些問題,我們調研和引入了Kafka 監控系統——Kafka Eagle(目前改名為EFAK)。復用該監控系統中比較重要的維度監控功能。
Kafka Eagle處理能夠滿足上訴兩個維度的監控需求之外,還提供了一些日常比較實用的功能,比如Topic記錄查看、Topic容量查看、消費和生產任務的速率、消費積壓等。我們採用了 Kafka-Eagle 來作為對實時數倉的任務監控。Kafka-Eagle 系統設計架構如下圖所示:
Kafka-Eagle 是一款完全開源的對 Kafka 集群及應用做全面監控的系統,其核心由以下幾個部分組成:
數據採集:核心數據來源 JMX 和 API 獲取;
數據存儲:支持 MySQL 和 Sqlite 存儲;
數據展示:消費者應用、圖表趨勢監控(包括集群狀態、消費生產速率、消費積壓等)、開發的分布式 KSQL 查詢引擎,通過 KSQL 消息查詢;
數據告警:支持常用的 IM 告警(微信,釘釘,WebHook等),同時郵件、短信、電話告警也一併支持。
部分預覽截圖如下:
1)Topic最近7天寫入量分布
默認展示所有Topic的每天寫入總量分布,可選擇時間維度、Topic聚合維度,來查看寫入量的分布情況,預覽截圖如下所示:
2)KSQL查詢Topic消息記錄
可以通過編寫SQL語句,來查詢(支持過濾條件)Topic中的消息記錄,預覽截圖如下所示:
3)消費Topic積壓詳情
可以監控所有被消費的Topic的消費速率、消費積壓等詳情,預覽截圖如下所示:
五、參考資料
1.https://kafka.apache.org/documentation/
2.http://www.kafka-eagle.org/
3.https://github.com/smartloli/kafka-eagle