在幾乎所有處理複雜數據的領域,Spark 已經迅速成為數據和分析生命周期團隊的事實上的分布式計算框架。Spark 3.0 最受期待的特性之一是新的自適應查詢執行框架(Adaptive Query Execution,AQE),該框架解決了許多 Spark SQL 工作負載遇到的問題。AQE 在2018年初由英特爾和百度組成的團隊最早實現。AQE 最初是在 Spark 2.4 中引入的, Spark 3.0 做了大量的完善和優化。另外,Spark 3.0 的動態分區修剪(Dynamic Partition Pruning, DPP)也為一些工作負載帶來了性能提升。本文將詳細介紹這兩種優化的原理。
Adaptive Query ExecutionCatalyst 早期實現的缺陷
下圖表示了使用 DataFrames 執行簡單的分組計數查詢時發生的分布式處理:
Spark 在第一階段(stage)確定了適當的分區數量,但對於第二階段,使用默認的分區數 200。也就是 spark.sql.shuffle.partitions 參數的默認值。這個默認值有以下幾個問題:
•200個分區值不太可能是理想的分區數量,而分區數是影響性能的關鍵因素之一;•如果將上圖的第二個階段的數據寫到磁盤,那麼我們將得到200個小文件。
你可以做的事情是在執行查詢之前手動設置這個屬性的值,就像這樣:
雖然我們可以手動設置分區數,但是也有以下挑戰:
•每個查詢都設置這個屬性是非常繁瑣的;•隨着業務的變化,之前設置的值可能會過時;•這個設置將應用於同一個程序裡面的所有 Shuffle 操作中。
在上一個示例的第一個階段之前,數據的分布和容量我們是可以知道的,Spark 可以為分區數量提供一個合理的值。然而對於第二階段,經過第一個階段處理後的數據大小很難準確估計,所以我們只能自己去估計。
自適應查詢執行設計原理
AQE 完全基於精確的運行時統計信息進行優化,引入了 Query Stages 的概念 ,並且以 Query Stage 為粒度,進行運行時的優化,其工作原理如下所示:
Query Stage 是由 Shuffle 或 broadcast exchange 劃分的,在運行下一個 Query Stage 之前,上一個 Query Stage 的計算需要全部完成,這是進行運行時優化的絕佳時機,因為此時所有分區上的數據統計都是可用的,並且後續操作還沒有開始。
AQE 可以理解成是 Spark Catalyst 之上的一層,它可以在運行時修改 Spark plan。
自適應調整分區數
這個功能從 Spark 2.4 開始就引入了,當啟用 AQE 時,將自動調整 shuffle 分區的數量,不再是默認的200或手動設置的值。要使用上面的功能,需要設置 spark.sql.adaptive.coalescePartitions.enabled 、spark.sql.adaptive.enabled 以及 spark.sql.adaptive.advisoryPartitionSizeInBytes。這時候 Spark 將會把連續的 shuffle partitions 進行合併(coalesce contiguous shuffle partitions)以減少分區數。
假設我們運行SELECT max(i)FROM tbl GROUP BY j查詢,tbl 表的輸入數據相當小,所以在分組之前只有兩個分區。我們把初始的 shuffle 分區數設置為 5,因此在 shuffle 的時候數據被打亂到 5 個分區中。如果沒有 AQE,Spark 將啟動 5 個任務來完成最後的聚合。然而,這裡有三個非常小的分區,為每個分區啟動一個單獨的任務將是一種浪費。
使用 AQE 之後,Spark 將這三個小分區合併為一個,因此,最終的聚合只需要執行三個任務,而不是五個。
動態將 Sort Merge Joins 轉換成 Broadcast Joins
Spark 支持許多 Join 策略,其中 broadcast hash join 通常是性能最好的,前提是參加 join 的一張表的數據能夠裝入內存。由於這個原因,當 Spark 估計參加 join 的表數據量小於廣播大小的閾值時,其會將 Join 策略調整為 broadcast hash join。但是,很多情況都可能導致這種大小估計出錯,比如表的統計信息不準確等。
有了 AQE,Spark 可以利用運行時的統計信息動態調整 Join 方式,只要參與 Join 的任何一方的大小小於廣播大小的閾值時,即可將 Join 策略調整為 broadcast hash join。如下圖就是利用這個調整 Join 策略的。
對於在運行時轉換的 broadcast hash join ,我們可以進一步將常規的 shuffle 優化為本地化 shuffle來減少網絡流量。
動態優化傾斜的 join
當數據在集群中的分區之間分布不均時,就會發生數據傾斜。嚴重的傾斜會顯著降低查詢性能,特別是在進行 Join 操作時。AQE 傾斜 Join 優化從 shuffle 文件統計信息中自動檢測到這種傾斜。然後,它將傾斜的分區分割成更小的子分區,這些子分區將分別從另一端連接到相應的分區。
假設表 A join 表B,其中表 A 的分區 A0 裡面的數據明顯大於其他分區。
將把分區 A0 分成兩個子分區,並將每個子分區 join 表 B 的相應分區 B0。
如果沒有這個優化,將有四個任務運行 sort merge join,其中一個任務將花費非常長的時間。在此優化之後,將有5個任務運行 join,但每個任務將花費大致相同的時間,從而獲得總體更好的性能。要啟用上面的功能需要用到下面三個參數:
•spark.sql.adaptive.skewJoin.enabled:是否啟用傾斜 Join 處理;•spark.sql.adaptive.skewJoin.skewedPartitionFactor:如果一個分區的大小大於這個數乘以分區大小的中值(median partition size),並且也大於spark.sql.adaptive.skewedPartitionThresholdInBytes 這個屬性值,那麼就認為這個分區是傾斜的。•spark.sql.adaptive.skewedPartitionThresholdInBytes:判斷分區是否傾斜的閾值,默認為 256MB,這個參數的值應該要設置的比 spark.sql.adaptive.advisoryPartitionSizeInBytes 大。
動態分區裁減
Spark 3.0 的第二個比較重要的性能優化是動態分區裁減(Dynamic Partition Pruning,簡稱 DPP),需要注意的是,如果開啟了動態分區裁減,那麼 AQE 將不會被觸發。這個優化在邏輯計劃和物理計劃上都有實現。
•在邏輯計劃層面,通過維度表構造出一個過濾子查詢,然後在掃描事實表之前加上這個過濾子查詢。通過這種方式,我們在邏輯計劃階段就知道事實表需要掃描哪些分區。但是,物理計劃執行起來還是比較低效。因為裡面有重複的子查詢,我們需要找出一種方法來消除這個重複的子查詢。為了做到這一點,Spark 在物理計劃階段做了一些優化。•在物理計劃層面,在維度表上運行上面構造的過濾,然後將結果廣播到事實表端,從而達到避免掃描無用的數據效果。
如果我們將 spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly 設置為 false,那麼 DPP 也可以在其他類型的 Join 上運行,比如 SortMergeJoin。在這種情況下,Spark 將估計 DPP 過濾器是否確實提高了查詢性能。DPP 可以極大地提高高度選擇性查詢的性能,例如,如果我們的查詢在5年的價值數據中過濾出其中一個月的數據。
在 TPC-DS 基準測試中,102個查詢中的60個得到2到18倍的加速。
藉助 AQE、DPP、GPU 的支持以及 Kubernetes 的支持,性能提升的前景非常樂觀,我們應該可以看到 Spark 3 將在越來越多的公司使用。
本文主要翻譯自:《How does Apache Spark 3.0 increase the performance of your SQL workloads》https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/