close

一、概述


Apache Kafka 發展至今,已經是一個很成熟的消息隊列組件了,也是大數據生態圈中不可或缺的一員。Apache Kafka 社區非常的活躍,通過社區成員不斷的貢獻代碼和迭代項目,使得 Apache Kafka 功能越發豐富、性能越發穩定,成為企業大數據技術架構解決方案中重要的一環。

Apache Kafka 作為一個熱門消息隊列中間件,具備高效可靠的消息處理能力,且擁有非常廣泛的應用領域。那麼,今天就來聊一聊基於 Kafka 的實時數倉在搜索的實踐應用。


二、為什麼需要 Kafka


在設計大數據技術架構之前,通常會做一些技術調研。我們會去思考一下為什麼需要 Kafka?怎麼判斷選擇的 Kafka 技術能否滿足當前的技術要求?


2.1 早期的數據架構


早期的數據類型比較簡單,業務架構也比較簡單,就是將需要的數據存儲下來。比如將遊戲類的數據存儲到數據庫(MySQL、Oracle)。但是,隨着業務的增量,存儲的數據類型也隨之增加了,然後我們需要使用的大數據集群,利用數據倉庫來將這些數據進行分類存儲,如下圖所示:

但是,數據倉庫存儲數據是有時延的,通常時延為T+1。而現在的數據服務對象對時延要求均有很高的要求,例如物聯網、微服務、移動端APP等等,皆需要實時處理這些數據。


2.2Kafka 的出現


Kafka 的出現,給日益增長的複雜業務,提供了新的存儲方案。將各種複雜的業務數據統一存儲到 Kafka 裡面,然後在通過 Kafka 做數據分流。如下圖所示:


這裡,可以將視頻、遊戲、音樂等不同類型的數據統一存儲到 Kafka 裡面,然後在通過流處理對 Kafka 裡面的數據做分流操作。例如,將數據存儲到數據倉庫、將計算的結果存儲到KV做實時分析等。

通常消息系統常見的有兩種,它們分別是:

消息隊列:隊列消費者充當了工作組的角色,每條消息記錄只能傳遞給一個工作進程,從而有效的劃分工作流程;

生產&消費:消費者通常是互相獨立的,每個消費者都可以獲得每條消息的副本。

這兩種方式都是有效和實用的,通過消息隊列將工作內容分開,用於容錯和擴展;生產和消費能夠允許多租戶,來使得系統解耦。而 Apache Kafka 的優點之一在於它將消息隊列、生產和消費結合到了一個強大的消息系統當中。

同時,Kafka 擁有正確的消息處理特性,主要體現在以下幾個方面:

可擴展性:當 Kafka 的性能(如存儲、吞吐等)達到瓶頸時,可以通過水平擴展來提升性能;

真實存儲:Kafka 的數據是實時落地在磁盤上的,不會因為集群重啟或故障而丟失數據;

實時處理:能夠集成主流的計算引擎(如Flink、Spark等),對數據進行實時處理;

順序寫入:磁盤順序 I/O 讀寫,跳過磁頭「尋址」時間,提高讀寫速度;

內存映射:操作系統分頁存儲利用內存提升 I/O 性能,實現文件到內存的映射,通過同步或者異步來控制 Flush;

零拷貝:將磁盤文件的數據複製到「頁面緩存」一次,然後將數據從「頁面緩存」直接發送到網絡;

高效存儲:Topic 和 Partition 拆為多個文件片段(Segment),定期清理無效文件。採用稀疏存儲,間隔若干字節建立一條索引,防止索引文件過大。


2.3 簡單的應用場景


這裡,我們可以通過一個簡單直觀的應用場景,來了解 Kafka 的用途。

場景:假如用戶A正在玩一款遊戲,某一天用戶A喜歡上了遊戲裡面的一款道具,打算購買,於是在當天 14:00 時充值了 10 元,在逛遊戲商店時又喜歡上了另一款道具,於是在 14:30 時又充值了 30 元,接着在 15:00 時開始下單購買,花費了 20 元,剩餘金額為 20 元。那麼,整個事件流,對應到庫表裡面的數據明細應該是如下圖所示:


三、Kafka解決了什麼問題


早期為響應項目快速上線,在服務器或者雲服務器上部署一個 WebServer,為個人電腦或者移動用戶提供訪問體驗,然後後台在對接一個數據庫,為 Web 應用提供數據持久化以及數據查詢,流程如下圖所示:


但是,隨着用戶的迅速增長,用戶所有的訪問都直接通過 SQL 數據庫使得它不堪重負,數據庫的壓力也越來越大,不得不加上緩存服務以降低 SQL 數據庫的荷載。

同時,為了理解用戶行為,又開始收集日誌並保存到 Hadoop 這樣的大數據集群上做離線處理,並且把日誌放在全文檢索系統(比如 ElasticSearch)中以便快速定位問題。由於需要給投資方看業務狀況,也需要把數據匯總到數據倉庫(比如 Hive)中以便提供交互式報表。此時的系統架構已經具有一定的複雜性了,將來可能還會加入實時模塊以及外部數據交互。

本質上,這是一個數據集成問題。沒有任何一個系統能夠解決所有的事情,所以業務數據根據不同用途,存放在不同的系統,比如歸檔、分析、搜索、緩存等。數據冗餘本身沒有任何問題,但是不同系統之間太過複雜的數據同步卻是一種挑戰。如下圖所示:

而 Kafka 可以讓合適的數據以合適的形式出現在合適的地方。Kafka 的做法是提供消息隊列,讓生產者向隊列的末尾添加數據,讓多個消費者從隊列裡面依次讀取數據然後自行處理。如果說之前連接的複雜度是 O(N^2),那麼現在複雜度降低到了 O(N),擴展起來也方便多了,流程如下圖所示:


四、Kafka的實踐應用


4.1 為什麼需要建設實時數倉


4.1.1 目的

通常情況下,在大數據場景中,存儲海量數據建設數據倉庫一般都是離線數倉(時延T+1),通過定時任務每天拉取增量數據,然後創建各個業務不同維度的數據,對外提供 T+1 的數據服務。計算和數據的實時性均比較差,業務人員無法根據自己的即時性需求獲取幾分鐘之前的實時數據。數據本身的價值隨着時間的流逝會逐步減弱,因此數據產生後必須儘快的到達用戶的手中,實時數倉的建設需求由此而來。


4.1.2 目標

為了適應業務高速迭代的特點,分析用戶行為,挖掘用戶價值,提高用戶留存,在實時數據可用性、可擴展性、易用性、以及準確性等方面提供更好的支持,因此需要建設實時數倉。主要目標包含如下所示:

統一收斂數據出口:統一數據口徑,減少數據重複性建設;

降低數據維護成本:提升數據準確性、及時性,優化數據使用體驗和成本;

減少數據使用成本:提高數據復用率,避免實時數據重複消費。


4.2 如何構建實時數倉為搜索提供數據


當前實時數倉比較主流的架構一般來說包含三個大的模塊,它們分別是消息隊列、計算引擎、以及存儲。結合上述對 Kafka 的綜合分析,結合搜索的業務場景,引入 Kafka 作為消息隊列,復用大數據平台(BDSP)的能力作為計算引擎和存儲,具體架構如下圖所示:


4.3 流處理引擎選擇


目前業界比較通用的流處理引擎主要有兩種,它們分別是Flink和Spark,那麼如何選擇流處理引擎呢?我們可以對比以下特徵來決定選擇哪一種流處理引擎?


Flink作為一款開源的大數據流式計算引擎,它同時支持流批一體,引入Flink作為實時數倉建設的流引擎的主要原因如下:

高吞吐、低延時;

靈活的流窗口;

輕量級容錯機制;

流批一體

4.4 建設實時數倉遇到的問題


在建設初期,用於實時處理的 Kafka 集群規模較小,單個 Topic 的數據容量非常大,不同的實時任務都會消費同一個大數據量的 Topic,這樣會導致 Kafka 集群的 I/O 壓力非常的大。

因此,在使用的過程中會發現 Kafka 的壓力非常大,經常出現延時、I/O能性能告警。因此,我們採取了將大數據量的單 Topic 進行實時分發來解決這種問題,基於 Flink 設計了如下圖所示的數據分發流程。


上述流程,隨着業務類型和數據量的增加,又會面臨新的問題:

數據量增加,隨着消費任務的增加,Kafka 集群 I/O 負載大時會影響消費;

不用業務之間 Topic 的消費沒有落地存儲(比如HDFS、HBase存儲等),會產生重複消費的情況;

數據耦合度過高,遷移數據和任務難度大。

4.5 實時數倉方案進階


目前,主流的實時數倉架構通常有2種,它們分別是Lambda、Kappa。

4.5.1 Lambda

隨着實時性需求的提出,為了快速計算一些實時指標(比如,實時點擊、曝光等),會在離線數倉大數據架構的基礎上增加一個實時計算的鏈路,並對消息隊列實現數據來源的流失處理,通過消費消息隊列中的數據 ,用流計算引擎來實現指標的增量計算,並推送到下游的數據服務中去,由下游數據服務層完成離線和實時結果的匯總。具體流程如下:

4.5.2 Kappa

Kappa架構只關心流式計算,數據以流的方式寫入到 Kafka ,然後通過 Flink 這類實時計算引擎將計算結果存放到數據服務層以供查詢。可以看作是在Lambda架構的基礎上簡化了離線數倉的部分。具體流程如下:

在實際建設實時數倉的過程中,我們結合這2種架構的思想來使用。實時數倉引入了類似於離線數倉的分層理念,主要是為了提供模型的復用率,同時也要考慮易用性、一致性、以及計算的成本。


4.5.3 實時數倉分層


在進階建設實時數倉時,分層架構的設計並不會像離線數倉那邊複雜,這是為了避免數據計算鏈路過長造成不必要的延時情況。具體流程圖如下所示:


ODS層:以Kafka 作為消息隊列,將所有需要實時計算處理的數據放到對應的 Topic 進行處理;

DW層:通過Flink實時消費Topic中的數據,然後通過數據清理、多維度關聯(JOIN)等,將一些相同維度的業務系統、維表中的特徵屬性進行關聯,提供數據易用性和復用性能力,最終得到實時明細數據;

DIM層:用來存儲關聯的查詢的維度信息,存儲介質可以按需選擇,比如HBase、Redis、MySQL等;

DA層:針對實時數據場景需求,進行高度聚合匯總,服務於KV、BI等場景。OLAP分析可以使用ClickHouse,KV可以選擇HBase(若數據量較小,可以採用Redis)。


通過上面的流程,建設實時數倉分層時,確保了對實時計算要求比較高的任務不會影響到BI報表、或者KV查詢。但是,會有新的問題需要解決:

Kafka 實時數據如何點查?

消費任務異常時如何分析?

4.5.4 Kafka監控


針對這些問題,我們調研和引入了Kafka 監控系統——Kafka Eagle(目前改名為EFAK)。復用該監控系統中比較重要的維度監控功能。

Kafka Eagle處理能夠滿足上訴兩個維度的監控需求之外,還提供了一些日常比較實用的功能,比如Topic記錄查看、Topic容量查看、消費和生產任務的速率、消費積壓等。我們採用了 Kafka-Eagle 來作為對實時數倉的任務監控。Kafka-Eagle 系統設計架構如下圖所示:

Kafka-Eagle 是一款完全開源的對 Kafka 集群及應用做全面監控的系統,其核心由以下幾個部分組成:

數據採集:核心數據來源 JMX 和 API 獲取;

數據存儲:支持 MySQL 和 Sqlite 存儲;

數據展示:消費者應用、圖表趨勢監控(包括集群狀態、消費生產速率、消費積壓等)、開發的分布式 KSQL 查詢引擎,通過 KSQL 消息查詢;

數據告警:支持常用的 IM 告警(微信,釘釘,WebHook等),同時郵件、短信、電話告警也一併支持。

部分預覽截圖如下:

1)Topic最近7天寫入量分布


默認展示所有Topic的每天寫入總量分布,可選擇時間維度、Topic聚合維度,來查看寫入量的分布情況,預覽截圖如下所示:


2)KSQL查詢Topic消息記錄


可以通過編寫SQL語句,來查詢(支持過濾條件)Topic中的消息記錄,預覽截圖如下所示:


3)消費Topic積壓詳情


可以監控所有被消費的Topic的消費速率、消費積壓等詳情,預覽截圖如下所示:


五、參考資料


1.https://kafka.apache.org/documentation/

2.http://www.kafka-eagle.org/

3.https://github.com/smartloli/kafka-eagle


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()