7月30日,由 Kyligence 主辦的 Data & Cloud Summit 2021 行業峰會在上海成功舉辦,此次峰會特設「開源有道」分論壇,邀請了來自 Apache Kylin,Apache Spark,Alluxio,Linkis,Ray 以及 MLSQL 等開源社區的技術大佬,分享了目前開源社區關於大數據、機器學習等多個熱門話題的前沿技術和最佳實踐。來自 eBay 數據團隊的馬剛分享了他們在用 Apache Spark 完全替代傳統數倉中遇到的技術挑戰及改進等話題,引起了現場觀眾的熱烈討論。
以下為馬剛在大會演講實錄
大家好!我叫馬剛,來自 eBay 的大數據團隊,很高興今天有機會在這裡分享我們團隊在過去 2 年做的工作,主要是基於開源的 Spark 和 Hadoop 替換掉傳統數據倉庫。今天我會講到我們在用 Apache Spark 替換傳統的數據倉庫中遇到的技術挑戰,以及我們怎麼解決的。
今天我分享的 Agenda 如下:
系統介紹
技術挑戰
- 功能性改進
- 性能改進
- 穩定性改進
總結
系統介紹
我們這個系統的名字叫 Carmel,它是基於開源的 Hadoop 和 Spark 來替換傳統的數據倉庫,我們是 2019 年開始做我們這個項目的,當時是基於 Spark 2.3.1,最近剛剛升到 Spark 3.0。面臨的主要技術挑戰,第一個是功能方面的缺失,包括訪問控制,還有一些 Update 和 Delete 的支持;在性能方面跟傳統數倉,特別是交互式的分析查詢中性能方面存在較大差距,還有一些穩定性的問題。

這是 Carmel 系統的整體架構,比較簡單,可以看到我們自己開發了 ODBC 和 JDBC driver,對接一些 BI 工具,用戶可以直接用 Python 或者 Java 來連我們的系統。中間是 Gateway,用來做用戶認證以及權限檢查,之後會把用戶的 SQL 請求發到一個分析查詢的集群。這裡有兩個集群,一個是我們主要使用的分析集群,另外一個是 eBay 內部比較通用的 Hadoop 集群,是用來跑大量的 Spark 或者 MapReduce 的任務,包括一些機器學習任務等,比較忙。
這個分析集群主要是服務於 eBay 內部的分析師,我們 Spark 也是跑在 YARN 上面,不同的部門會切不同的 Queue,這些不同部門的請求之後會分到對應 Queue 的 Spark Driver,是一個 long running 的服務,它是一直啟動着的,給所有部門的用戶,所有 SQL 都是它提供服務,去解析、去進行執行的。這個分析集群是 SSD 存儲,存儲的性能比較好。上面主要是存一些 Spark Shuffle 和 Cache 的數據,以及分析師個人的數據集。這個分析集群的 Spark Driver 還可以共享集群的 HDFS 數據,讓用戶也可以直接分析生產的一些數據集,這個分析的集群跟 General 集群是共享一個 Hive Metastore 的,表是可以互相訪問的。
功能改進
接下來我會講在功能性的改進方面做了哪些工作。
訪問控制

第一是訪問控制,前面提到我們有一個 Gateway 的服務器負責做一些用戶權限認證,還有對一些集群和 Queue 的訪問控制。我們通過一個系統賬戶來讀寫 HDFS,個人賬戶不能直接訪問 HDFS,來避免一些安全方面的問題。對於一些數據庫或者表級別的訪問權限控制,我們是基於 SQL 來做訪問控制,類似於傳統數據庫的訪問控制 SQL 語句,例如 Grant、創建 Role 等。用戶也是通過 view 來進行列級別的訪問控制,可以針對每一個物理表建立不同的 view,讓某些用戶只能訪問一些不敏感的列。
對 Update/Delete 支持

我們是基於 Delta Lake 來做 Update 和 Delete 的支持。剛開始做的時候是基於 0.4 的版本,現在升級到了最新版本。Delta Lake 當時只支持 Dataframe 的接口,我們提供了 Update/Delete 的 SQL 語法的支持,還支持了比較先進的傳統數倉中會支持 Update/Delete with Join 語法。還有就是 Delta 表的管理,我們這個系統是基於 Hadoop 的,大家知道 Hadoop 最怕小文件,但是 Delta Lake 的特點就是每次都會創建比較多的小文件出來,會使 Hadoop 系統不是很穩定,我們會定期地去清除老版本的 Delta 文件,做一些 Delta 表的管理工作。
上傳下載 API

我們還對一些缺失的功能進行了改進,比如上傳下載的 API。用戶經常會把一些外部數據傳到數倉裡面去做分析,比如對一些社交媒體的數據進行分析,我們需要支持這種上傳 CSV 文件到某個表或者某個表的分區。其次就是下載,用戶經常會用一些 BI 工具,比如 Tableau 或者 MicroStrategy 去做分析,這種分析工具經常會需要下載比較大規模的結果集到工具本身去進行操作,比如構建一些本地的 Cube 等,而且我們線上發現用戶經常會下載 100G 到 200G 的數據到本地。Spark 原生的一些 API、thrift API 的性能是比較差的。我們也自己實現了一些下載的 API,來提升性能,把那些 Parquet 文件直接下載到文地,通過 ODBC driver 去迭代本地的 Parquet 文件,來提高它的性能。經過我們測試,通過這種方式去訪問超大的數據集會比傳統的 Thrift API 快 3 - 4 倍,因為減少了大量的 RPC 調用,還有每條記錄的序列化、反序列化的開銷。
其他新功能
我們還增加了很多其他新功能,比如支持 Volatile table,因為 Spark 社區版只支持一個 Temporary view,只定義一個 SQL,不會 materialized 到存儲中去。傳統數倉其實有這種功能,用戶寫的時候經常會建了很多 tmp table,把 tmp table 遷到 Spark 中去,如果直接用 Temporary view 來代替的話,最後生成的 SQL 執行計劃會非常複雜,性能會非常差。
我們也實現了對 like any/like all 的支持,還有對壓縮表的支持,主要也是解決 Hadoop 中一些小文件的問題,把小文件壓縮成一些比較大的文件。
性能改進
接下來我會介紹我們在性能改進方面所做的一些工作。
性能改進 - 透明數據緩存

第一個,透明的數據緩存,前面介紹過我們系統的架構裡面有 2 個 HFDS,一個是分析集群的 HDFS、一個是共享集群的 HDFS,共享集群的 HFDS Load 比較高,經常會跑一些機器學習的任務。如果用戶要訪問共享集群的 HDFS,經常會不穩定。比如說共享的 Namenode 不穩定,或者是後面的 Datanode 不穩定,會時快時慢。我們對常用的生產數據集做了數據緩存,會定期把這些常用數據集從共享集群複製到分析集群。用戶分析的時候,生成物理執行計劃的時候會把這些 HDFS 文件透明替換掉,用戶是感知不到你在後面做了緩存的。
性能改進 - 索引

我們也做了一個 Bloom Filter 索引,因為我們線上還是有一些查詢是點查的場景,從非常大的數據集中,最後結果只需要一點點的數據,這時候我們可以建一些索引。我們這個索引是單獨的一個索引文件,跟數據文件是分開的,所以可以比較靈活地根據用戶的需求,在表的不同列上建索引、刪除索引。同時,我們提供了建索引的 SQL 語法,就跟普通的 OLTP 建索引的語法類似。我們也測試了一些比較常見的點查場景,在 80%-90% 的場景下,IO 和 CPU 使用降低比較多。
性能改進 - AQE
我們也對 AQE 進行了一些性能改進。之前提到我們系統最開始是基於 Spark 2.3.1 的,AQE 是 Spark 3.0 的時候引進來的,所以我們把 AQE Backport 進我們版本裡面去了。AQE 對我們所在的場景的性能提升是非常重要的。我們是一個共享的 Spark Driver,用戶是沒辦法設定 Spark 參數的,比如 shuffle partition 的數量都是固定的。在 AQE,你可以做 partition 數量的 coalesce,把一些小的 partition 壓成一個大的 partition。還有就是把 SortMergeJoin 轉成 BroadcastJoin 去處理一些 Skew Join 的 Case。在社區版本的 Spark 中的 Skew Join 的場景是比較簡單的,兩邊是 shuffle stage,中間是一個 SortMergejoin,為了處理這種場景,我們做了更多改進。

比如說第一個式子中,單個 Querystage,右邊是一個 Parquet 表,他是沒有 shuffle 在的。第二種不是這種比較經典的兩個 shuffle stage,是後面加 sort 的,可能中間還有一些別的算子比如 Aggregation 算子,SortMergeJoin 等。
性能改進 - Bucket Join
我們做的另外一個性能改進是 Bucket Join,一些倍數關係的 Bucket 表的 join 是不用 shuffle 的。


我們現在支持兩種不用 shuffle 的方式,一種是 Merge Sort,把多的併到小的,把 Table A 和 Table B 進行 Join,左邊表的 Bucket 數量是 4 個,右邊 Bucket 數量是 8 個,可以按左邊 4 個去做一個 Join,右邊是不需要 shuffle 的。還有 Rebucket,相反的就把少變多,task 數量會變多。缺點可能在於會重複讀數據,IO 會多一些,現在我們生產上是 enable merge sort 這種方式。
另外一個性能改進是 DPP,我們也把 Spark 3.0 中的 DPP 移植到 Spark 2.3 中,做了一些改進,使 DPP 和 AQE 可以協同工作。目前社區版 AQE enable 的時候,DPP 是沒辦法同時運行的。但是這兩個功能對我們線上版本都還是很有用的,所以我們進行了一些改動,使它們可以同時工作。

還有 Runtime Filter,它的原理和 DPP 類似,因為 DPP 要求你的 Join 條件中包含了 partition column 才會 enable 成 DPP,Runtime Filter 可以把一些非 partition 條件做一些 filter 放到左邊,這對某些 case 比較有用,比如在右表很小、左表很大的情況下,如果 filter 效率比較高的話,可以使它的 shuffle 數據量減少非常多。
性能改進 - Range Join

我們做的另外一個性能改進是 Range Join。目前我們線上大概每天有 2000 多個非等值的 Join,join 的條件里都是大於等於、小於等於或者是不等於這種條件,對這種Join,Spark 裡面默認是用Broadcast NestedloopJoin,它的性能是比較差的,特別是對於Join和 Broadcast 的表都比較大,它的時間複雜度是 O(N*M)。我們對一些 Range join,比如 join 條件是 A Between B and C 這樣的,我們會用 BroadcastRangeJoin,因為 Broadcast NestedloopJoin 會 Broadcast 那個小表,BroadcastRangeJoin 相當於是給那個小表做了索引,給它排個序,Join 時候就不需要每一條都掃描,只掃描一部分就可以了,他的複雜度就降為 O(N*2*LOG(M)),所以在某些場景下,性能會有上百倍的提升,比如左邊有1000萬,右邊有100萬 Join 的這種 Case。
Parquet 讀優化

我們還做了一些對 Parquet 的讀優化,主要是做了一些降低讀 Parquet 文件時的 NameNode rpc 的調用,以及多線程讀 Parquet 文件。為什麼我們要用多線程去讀呢?舉個例子,在我們線上,有一個用戶行為的表非常大,而且是一個 bucket 表,bucket 數量不能太多,數量太多的話會讓 task 也非常多,系統非常忙。我們對這種 bucket 表進行掃描的時候,可能一個task要讀的Parquet文件就非常多,這種 Hadoop 的「讀」就成為查詢的瓶頸了。對於這種場景,我們可以使用多線程去讀 Parquet 文件。我們前面說了系統中有共享的集群,HDFS 本身不是很穩定,我們會使用多線程去讀的話,會降低 HDFS 讀等待的開銷。還有下推更多的 filter 到 Parquet 文件。
其它性能改進
我們還做了其他方面的性能改進,比如調度性能改進,對 Spark 里 DAGScheduler 的改進等。因為我們是一個共享的 Spark Driver,很多用戶對調度的要求比較高,希望 task 能夠很快地調動起來,但是現有 Spark 社區版本里大部分都不是這種場景的,所以對調度的性能要求沒那麼高,我們做了很多異步化和多線程的改造。同時我們還做了 Spark Driver 里一些鎖的優化,從比較大的鎖粒度降到比較小的鎖粒度。第三點是物化視圖,目前這個功能已經完成,但是沒有在線上用起來,因為還存在一些數據質量方面的問題,在物化視圖更新這方面還沒完善,所以說我們還在做。
穩定性改進
Driver 穩定性改進
接下來我會講一下在穩定性方面的一些改進,主要講Executor和Driver兩個方面,很多用戶需要Driver,Driver 是長期運行的一個服務,對穩定性要求非常高,我們做了以下改進:
大結果集溢寫到磁盤;
限制最終結果集和中間結果集的大小;
Broadcast 的管理和限制。經過線上的跟蹤,都是這個表太多引起內存的問題,我們對此進行了管理和限制;
單個 SQL 的 task 數量以及總 task 時間限制;
單次 table scan 的文件數和大小的限制;
SQL 優化階段的限制;
Join 膨脹率的限制;
Spark UI 的分離;
DAGSchedule 線程模型改進。
Executor 穩定性改進
我們還做了一些 Executor 的穩定性改進,包括:
shuffle 內存控制;
UDF review/release 流程改進;
加各種限制保護 Executor 內存。
總結

最後,我簡單做一個總結,上圖是我們現在的一個系統狀況。大概每天跑 30 多萬個查詢,80 分位的查詢在 20 秒左右,95 分位是 100 秒左右,目前大概是這樣的情況。
推薦閱讀
Apache Hudi 在 B 站構建實時數據湖的實踐
Apache Hudi在華米科技的應用-湖倉一體化改造
基於Hudi的流式CDC實踐一:聽說你準備了面試題?
如何將數據更快導入Apache Hudi?
Flink + Hudi,構架倉湖一體化解決方案