1. 摘要
Robinhood 的使命是使所有人的金融民主化。Robinhood 內部不同級別的持續數據分析和數據驅動決策是實現這一使命的基礎。我們有各種數據源——OLTP 數據庫、事件流和各種第 3 方數據源。需要快速、可靠、安全和以隱私為中心的數據湖攝取服務來支持各種報告、關鍵業務管道和儀錶板。不僅在數據存儲規模和查詢方面,也在我們在數據湖支持的用例方面,我們從最初的數據湖版本[1]都取得了很大的進展。在這篇博客中,我們將描述如何使用各種開源工具構建基於變更數據捕獲的增量攝取,以將我們核心數據集的數據新鮮延遲從 1 天減少到 15 分鐘以下。我們還將描述大批量攝取模型中的局限性,以及在大規模操作增量攝取管道時學到的經驗教訓。
2. 數據湖和生態系統
Robinhood 的數據湖存儲和計算基礎架構是為我們的許多數據驅動功能提供支持的基石,例如業務分析儀錶板和產品改進見解。它也是為業務和臨時報告和分析運行大規模數據處理的數據源。此外,生態系統會影響以隱私為中心的原語,例如旨在保護用戶隱私的匿名化和訪問控制。
主要的 OLTP(在線事務處理)數據庫由 Postgres RDS 管理;Amazon S3 是 Data Lake 存儲,它為我們的 Data Lake 提供經濟高效且可擴展的存儲層;我們主要使用 Apache Spark 運行生產批處理管道;我們的儀錶板由 Trino 分布式 SQL 查詢引擎提供支持;Apache Hadoop Yarn 管理用於運行 Apache Spark 作業的計算集群;Apache Hive Metastore 為查詢引擎管理和提供表模式;Apache Airflow 是工作流編排服務。下圖是具有計算生態系統的數據湖
在整篇文章中我們使用指標「數據新鮮度」來比較下面不同的數據攝取架構,此指標為源數據庫中的表中發生的更改在相應的 Data Lake 表中可見提供了時間延遲。
3. 大批量攝取的限制
作為數據湖演進的第一步,我們首先使用在線數據庫的只讀副本獲取在線數據庫的每日快照。攝取這些表的完整快照會導致數據湖表的寫入放大率很高。即使對於一個有數十億行的表來說,一天只有幾十萬行的變化,攝取該表的完整快照也會導致讀取和寫入整個表。此外當使用實時副本(而不是作為上游的數據庫備份)時,在只讀副本 I/O 性能方面會出現瓶頸,這會導致快照時間過長,從而導致較大的攝取延遲。即使採用了諸如通過分區讀取並行化 I/O 之類的技術,這種攝取架構也無法在一小時內交付數據。Robinhood 確實需要保持數據湖的低數據新鮮度。許多過去在市場交易時間之後或之前以每日節奏運行的批處理管道必須以每小時或更高的頻率運行,以支持不斷發展的用例。很明顯我們需要更快的攝取管道將在線數據庫複製到數據湖。
4. 新架構
實現 Data Lake 較低數據新鮮度的更好方法是增量攝取。增量攝取是一種眾所周知的技術,用於為數據湖構建有效的攝取管道。在這裡攝取管道不是拍攝快照並將它們作為一個整體轉儲到 Data Lake,而是以流方式使用 OLTP 數據庫的預寫日誌並將它們攝取到 Data Lake 表中,就像數據庫到數據庫複製的方式一樣。從概念上講,我們有一個兩階段管道。
•變更數據捕獲 (CDC) 服務使用 OLTP 數據庫中的預寫日誌 (WAL) 數據並將它們緩衝在變更日誌隊列中。•數據攝取作業定期或以連續方式拖尾隊列並更新數據湖「原始」表。
下圖是增量攝取組件
中間更改日誌隊列允許分離兩個階段之間的關注點,這兩個階段將能夠獨立運行,並且每個階段都可以暫停而不影響另一個階段。隊列提供了必要的隔離,以便將數據攝取到數據湖的任何延遲都不會對 CDC 造成背壓。在第一階段,我們選擇 Debezium 作為變更數據捕獲 (CDC) 提供商。Debezium 是一個構建在 Kafka Connect 之上的開源分布式變更數據捕獲平台,Debezium 帶有一個經過充分證明的一流 Postgres CDC 連接器。根據我們的基準測試,我們發現 Debezium 可以輕鬆處理我們預計的負載量,我們已經設置 Debezium 使用開源的 Confluent Schema Registry 以 avro 編碼格式將更改記錄寫入 Kafka,與 json 編碼相比,Avro 編碼提供了更好的性能。在第二階段,我們使用 Apache Hudi 從 Kafka 增量攝取變更日誌,以創建數據湖表。Apache Hudi 是一個統一的數據湖平台,用於在數據湖上執行批處理和流處理,Apache Hudi 帶有一個功能齊全的基於 Spark 的開箱即用的攝取系統,稱為 Deltastreamer,具有一流的 Kafka 集成和一次性寫入功能,與不可變數據不同,我們的 CDC 數據有相當大比例的更新和刪除,Hudi Deltastreamer 利用其可插入的記錄級索引在 Data Lake 表上執行快速高效的 upserts,Hudi 通過自動清理舊文件版本、數據Clustering、Hive表模式同步和文件大小調整來自我管理其表,以寫入大小合適的文件,原始表當前以 Hudi 的寫時複製模式存儲,該模式提供原生列式讀取性能。
5. 效果總結
我們已經部署了增量攝取管道,以將 1000 個 Postgres 表攝取到數據湖中。在新架構之前,由於快照的限制和所涉及的成本,這些表只能保證能夠以每天的節奏進行快照。使用這種新架構,Data Lake 用戶很高興看到關鍵表的數據新鮮度從 24 小時縮短到 15 分鐘以下。大批量快照運行時間顯示快照表的運行時間長。請注意由於只讀副本 I/O 瓶頸,其中許多表的快照需要按順序運行。
顯示大批量快照的大批量快照運行計劃每天僅運行一次,這是因為從數據庫中快照所有表的周轉時間很長。
新的增量攝取數據新鮮度顯示新攝取系統的端到端數據新鮮度約為 5 分鐘。
6. 經驗教訓
在本節中我們將分享在大規模構建增量攝取管道時學到的經驗教訓。我們希望這對任何希望為他們的數據湖踏上類似旅程的人來說都是有價值的。
7. 可縮放的初始引導程序
對數據湖的增量攝取仍然需要源表的初始快照。Debezium 確實提供了初始快照模式,但需要查詢主 RDS 實例,我們不想查詢主 RDS 實例以進行快照,以避免生產 OLTP 查詢與初始快照查詢之間的任何資源競爭。此外,我們需要通過以無鎖方式運行並發分區查詢以及從數據庫備份中獲取快照來優化初始快照時間的能力。出於這些原因,我們在 Apache Hudi Deltastreamer 之上提供了專用的只讀副本並實現了一個自定義快照器,它利用 Spark 運行並發分區快照查詢來獲取表的初始快照,Apache Hudi 的可插拔源框架允許我們用幾行代碼無縫實現這一點。對於帶外初始快照,我們需要在增量攝取和快照之間切換時仔細跟蹤 CDC 流中的正確水印,使用 Kafka,數據攝取作業的 CDC 水印轉換為 Kafka 偏移量,這標誌着要應用於快照表的開始更改日誌事件,如果我們選擇一個任意的 Kafka 偏移量,我們最終可能會錯過一些應用到 Data Lake 表的更改事件。
從概念上講,我們需要 3 個階段來執行正確的快照並過渡到增量攝取:
•保存最新的 Kafka 偏移量,以在切換到增量攝取時用於重播變更日誌。設「Tₛ」為最新事件的源時間。•確保只讀副本在時間「Tₛ + Δ」時是最新的,其中 Δ 表示捕獲 kafka 偏移量以及額外緩衝時間時的 Debezium 延遲。否則,整個方程式將無法保證 0% 的數據丟失。從只讀副本中獲取表的初始快照並創建 Data Lake 表•從之前存儲的 kafka 偏移量開始消費並執行表的增量攝取。一旦增量攝取開始發生,將配置單元表定義同步到數據的最新位置,下游消費者現在將能夠查詢新引導的表。
下圖是使用引導架構的增量攝取架構
從專用只讀副本進行快照具有局限性,例如副本端的 I/O 瓶頸以及 24 * 7 在線維護只讀副本的成本開銷。我們正在探索一種對 OLTP 數據庫進行按需備份並使用 AWS S3 導出發布到 S3 的方法。然後我們可以依靠大規模處理這些 S3 導出並構建初始快照,這種機制可能允許更快的快照並克服只讀副本端的一些 I/O 瓶頸。
8. 使用 Postgres 邏輯複製監控背壓風險
Postgres 邏輯複製需要 CDC 連接器直連主 RDS。Postgres 邏輯複製協議保證保留 WAL 日誌文件,直到 Debezium 完全處理它們。如果 Debezium 卡住或無法跟上消耗 WAL 日誌的速度,這可能會導致 WAL 日誌文件累積並耗盡可用磁盤空間,Debezium 社區建議密切監視滯後消息,我們的 Debezium 負載測試也讓我們對 Debezium 能夠處理預計的變更速度增加充滿信心。
9. 自動化恢復
從每日快照切換到增量攝取的副作用之一是攝取工作流變得有狀態。管道可能處於快照或增量攝取狀態。此外,還需要執行架構升級、監控和數據質量驗證等其他操作,新表和數據庫需要定期地加入。端到端管道涉及不同的系統——在線 CDC 世界和數據湖的批處理/流攝取。為 1000 個表執行入職和常規操作需要適當的狀態管理和自動化。我們意識到我們需要在內部構建一流的編排服務,該服務將利用 Apache Airflow 來管理攝取管道、跟蹤載入和表狀態並自動處理狀態轉換和其他維護,這有助於我們大規模運營管道。
10. 並非所有表都是平等的
當談到這些表對我們的關鍵用例的重要性時,pareto原則是有效的,我們有一小部分關鍵表需要在 15 分鐘內保證數據新鮮度,我們採取了一種方法,根據表的重要性將表分類為不同的層,高度關鍵的表被標記為第 0 層,對於這些表,我們提供了一個單獨的 CDC 複製槽,以將這些關鍵表的 CDC 通道與其他表的通道隔離。此外我們為 Hudi deltastreamer 提供了專門的資源,以持續攝取增量更改日誌,並能夠在 5 -15 分鐘內保持數據最新。對於較低優先級的表,Hudi deltastreamer 配置為以批處理模式每 15 分鐘運行一次。
11. 管理 Postgres 模式更新
我們的業務是將表從在線 OLTP 世界複製到 Data Lake 世界,複製的數據不是不透明的,而是具有適當的模式,並且複製管道保證了將在線表模式轉換為數據湖的模式的明確定義的行為。鑑於 Data Lakes 還能夠存儲數據更改的整個歷史,因此在線和 Data Lake 世界的向後兼容性意味着什麼不同。例如,在在線世界中,向 postgres 添加一個不可為空的列是非常好的,但不會遵守用於存儲動態變更日誌的 Avro(或 Protobuf)的模式演變規則。擁有明確定義的架構演化合約有助於保持數據湖管道更加穩定。我們發現大多數時候,Schema更改涉及添加新列,我們正在使用 Debezium 功能來凍結我們從 Postgres 表中讀取的列集,並依靠重新引導表來處理模式升級,我們計劃為端到端管道添加模式兼容性檢測機制,以減少重新引導的次數。
12. 未來規劃
我們看到使用增量攝取的原始數據湖表的採用速度更快,並且我們正在不斷努力提高管道的可靠性。以下是我們正在着手的一些後續步驟:
•數據質量保證:我們實施了以不同頻率運行的通用和自定義數據質量和完整性檢查,以發現複製數據中的差異,我們正在努力利用 Apache Hudi 的預提交驗證支持在每批提交之前運行自定義驗證。•進一步減少數據新鮮度滯後:我們目前使用的是 Apache Hudi Copy-On-Write 格式。在這種模式下,我們可以看到大約 5-15 分鐘範圍內的數據新鮮度,我們計劃探索 Apache Hudi 的 Merge-On-Read 格式,以進一步降低數據新鮮度。•流式數據湖:Apache Hudi 提供增量處理能力,就像數據庫變更日誌一樣,我們未來的工作涉及使用這種原語並構建端到端流管道以有效地將更改滲透到下游表,這也將使我們能夠以實時流媒體的方式執行隱私保護操作,例如屏蔽和匿名化。•用於服務間數據交換的 CDC 服務:CDC 已在 Robinhood 中用於為數據湖的增量攝取提供更改流,我們正在研究使用 CDC 流在各種在線微服務之間進行可靠的數據交換。•數據計算:我們一直致力於提高基於 Apache Spark 和 Trino 構建的數據計算平台的可用性、效率和性能,以支持關鍵數據計算工作負載。
這些是在 Robinhood 數據基礎設施團隊工作的激動人心的時刻,因為我們已經開始構建下一代 Robinhood 數據湖。
推薦閱讀Onehouse 對Apache Hudi開源社區的承諾
[1]最初的數據湖版本:[https://robinhood.engineering/data-lake-at-robinhood-3e9cdf963368](https://robinhood.engineering/data-lake-at-robinhood-3e9cdf963368)