close

作者簡介

cxzl25,攜程軟件技術專家,關注大數據領域生態建設,對分布式計算和存儲、調度等方面有濃厚興趣。


一、前言

Data lineage includes the data origin, what happens to it and where it moves over time. Data lineage gives visibility while greatly simplifying the ability to trace errors back to the root cause in a data analytics process. ──百科Data lineage

大數據時代,數據的來源極其廣泛,各種類型的數據在快速產生,數據也是爆發性增長。從數據的產生,通過加工融合流轉產生新的數據,到最終消亡,數據之間的關聯關係可以稱之為數據血緣關係。

數據血緣是元數據管理、數據治理、數據質量的重要一環,追蹤數據的來源、處理、出處,對數據價值評估提供依據,描述源數據流程、表、報表、即席查詢之間的流向關係,表與表的依賴關係、表與離線ETL任務,調度平台,計算引擎之間的依賴關係。數據倉庫是構建在Hive之上,而Hive的原始數據往往來自於生產DB,也會把計算結果導出到外部存儲,異構數據源的表之間是有血緣關係的。

數據血緣用途:

追蹤數據溯源:當數據發生異常,幫助追蹤到異常發生的原因;影響面分析,追蹤數據的來源,追蹤數據處理過程。

評估數據價值:從數據受眾、更新量級、更新頻次等幾個方面給數據價值的評估提供依據。

生命周期:直觀地得到數據整個生命周期,為數據治理提供依據。

安全管控:對源頭打上敏感等級標籤後,傳遞敏感等級標籤到下游。


本文介紹攜程數據血緣如何構建及應用場景。第一版T+1構建Hive引擎的表級別的血緣關係,第二版近實時構建Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的字段級別血緣關係。二、構建血緣的方案2.1 收集方式

方案一:只收集SQL,事後分析。

當SQL執行結束,收集SQL到DB或者Kafka。

優點:當計算引擎和工具不多的時候,語法相對兼容的時候,用Hive自帶的LineageLogger重新解析SQL可以獲得表和字段級別的關係。

缺點:重放SQL的時候可能元數據發生改變,比如臨時表可能被Drop,沒有臨時自定義函數UDF,或者SQL解析失敗。

方案二:運行時分析SQL並收集。

當SQL執行結束後立即分析Lineage,異步發送到Kafka。

優點:運行時的狀態和信息是最準確的,不會有SQL解析語法錯誤。

缺點:需要針對各個引擎和工具開發解析模塊,解析速度需要足夠快。

2.2 開源方案Apache Atlas

Apache Atlas是Hadoop社區為解決Hadoop生態系統的元數據治理問題而產生的開源項目,它為Hadoop集群提供了包括數據分類、集中策略引擎、數據血緣、安全和生命周期管理在內的元數據治理核心能力。官方插件支持HBase、Hive、Sqoop、Storm、Storm、Kafka、Falcon組件。

Hook在運行時採集血緣數據,發送到Kafka。Atlas消費Kafka數據,將關係寫到圖數據庫JanusGraph,並提供REST API。

其中Hive Hook支持表和列級別血緣,Spark需要使用GitHub的hortonworks-spark/spark-atlas-connector,不支持列級別,Presto則不支持。

Linkedin DataHub

WhereHows項目已於2018年重新被LinkedIn公司設計為DataHub項目。它從不同的源系統中採集元數據,並進行標準化和建模,從而作為元數據倉庫完成血緣分析。

社區提供了一個Demo,演示地址:https://demo.datahubproject.io/

與Airflow集成較好,支持數據集級別血緣,字段級別在2021Q3的Roadmap。

三、攜程方案

攜程採用了方案二,運行時分析SQL並收集分析結果到Kafka。由於開源方案在現階段不滿足需求,則自行開發。

由於當時缺少血緣關係,對數據治理難度較大,表級別的血緣解析難度較低,表的數量遠小於字段的數量,早期先快速實現了表級別版本。

在16-17年實現和上線了第一個版本,收集常用的工具和引擎的表級別的血緣關係,T+1構建關係。

在19年迭代了第二個版本,支持解析Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的字段級別血緣關係,近實時構建關係。

四、第一個版本-表級別血緣關係4.1 處理流程

針對Hive引擎開發了一個Hook,實現ExecuteWithHookContext接口,從HookContext可以獲得執行計劃,輸入表,輸出表等豐富信息,異步發送到Kafka,部署的時候在hive.exec.post.hooks添加插件即可。

在17年引入Spark2後,大部分Hive作業遷移到Spark引擎上,這時候針對Spark SQL CLI快速開發一個類似Hive Hook機制,收集表級別的血緣關係。

傳輸工具DataX作為一個異構數據源同步的工具,單獨對其開發了收集插件。

在經過解析處理後,將數據寫到圖數據庫Neo4j,提供元數據系統展示和REST API服務,落地成Hive關係表,供用戶查詢和治理使用。

4.2 效果

在元數據系統上,可以查看一張表多層級的上下游血緣關係,在關係邊上會有任務ID等一些屬性。

4.3 痛點

隨着計算引擎的增加,業務的增長,表級別的血緣關係已經不滿足需求。

覆蓋面不足,缺少Spark ThriftServer , Presto引擎,缺少即席查詢平台,報表平台等。

關係不夠實時,期望寫入表後可以快速查詢到關係,用戶可以直觀查看輸入和輸出,數據質量系統,調度系統可以根據任務ID查詢到輸出表,對表執行質量校驗任務。

圖數據庫Neo4j社區版為單機版本,存儲數量有限,穩定性欠佳,當時使用的版本較低,對邊不能使用索引(3.5支持),這使得想從關係搜索到關聯的上下游較為麻煩。


五、第二版本-字段級別血緣關係

之前實現的第一個版本,對於細粒度的治理和追蹤還不夠,不僅缺少對字段級別的血緣關係,也不支持採集各個系統的埋點信息和自定義擴展屬性,難以追蹤完整鏈路來源,並且關係是T+1,不夠實時。

針對各個計算引擎和傳輸工具DataX開發不同的解析插件,將解析好的血緣數據發送到Kafka,實時消費Kafka,把關係數據寫到分布式圖數據JanusGraph。

5.1 傳輸工具DataX

阿里開源的Druid是一個 JDBC 組件庫,包含數據庫連接池、SQL Parser 等組件。通過重寫MySqlASTVisitor、SQLServerASTVisitor來解析MySQL / SQLServer的查詢SQL,獲得列級別的關係。

5.2 計算引擎

計算引擎統一格式,收集輸入表、輸出表,輸入字段、輸出字段,流轉的表達式等一些信息。

Hive

參考 org.apache.hadoop.hive.ql.hooks.LineageLogger 實現,異步發送血緣數據到 Kafka。

Atlas的HiveHook也是實現ExecuteWithHookContext接口,從HookContext獲得LineageInfo,也可以參考HIVE-19288 引入的org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook,採集更多引擎相關的信息。

其中遇到幾個問題:

通過HiveServer2執行獲取的start time不正確

HIVE-10957 QueryPlan's start time is incorrect in certain cases

獲取執行計劃空指針,導致收集失敗

HIVE-12709 further improve user level explain

獲取執行計劃有可能出現卡住,可以加個調用超時。

Spark前置條件:引入 SPARK-19558 Add config key to register QueryExecutionListeners automatically,實現自動註冊QueryExecutionListener。

實現方式:通過實現QueryExecutionListener接口,在onSuccess回調函數拿到當前執行的QueryExecution,通過LogicalPlan的output方法,獲得所有Attribute,利用NamedExpression的exprId映射關係,對其進行遍歷和解析,構建列級別關係。

覆蓋範圍:Spark SQL CLI、Thrift Server、使用Dataset/DataFrame API(如spark-submit、spark-shell、pyspark)

遇到問題:

使用analyzedPlan而不是optimizedPlan,optimizer的執行計劃可能會丟失一些信息,可以在analyzedPlan的基礎上apply一些有助於分析的Rule,如CombineUnions。


傳遞的初始化用的hiveconf/hivevar變量被Thrift Server忽略,導致初始化Connection沒有辦法埋點。


打上Patch SPARK-13983 ,可以實現第一步,傳遞變量,但是這個變量在每次執行新的statement都重新初始化,導致用戶set的變量不可更新。後續給社區提交PR SPARK-26598,修復變量不可更新的問題。

SPARK-13983 Fix HiveThriftServer2 can not get "--hiveconf" and "--hivevar" variables since 2.0

SPARK-26598 Fix HiveThriftServer2 cannot be modified hiveconf/hivevar variables

Drop Table 的限制,DropTableCommand執行成功的時候,該表不一定在之前存在過,如果在Drop之前存在過,元數據也已經被刪除了,無從考證。

在DropTableCommand增加了一個標誌位,真正在有執行Drop操作的話再置為True,保證收集的血緣數據是對的。

使用Transform用戶自定義腳本的限制

Transform不像java UDF,只輸入需要用到的字段即可,而是需要將所有後續用到的字段都輸入到自定義腳本,腳本再決定輸出哪些字段,這其中列與列之間的映射關係無法通過執行計劃獲得,只能簡單的記錄輸出列的表達式,如transform(c1,c2,c3) script xxx.py to c4。

Presto

開發Presto EventListener Plugin,實現EventListener接口,從queryCompleted回調函數的QueryCompletedEvent解析得到相應的信息。

上線的時候遇到一個無法加載Kafka加載StringSerializer的問題(StringSerializer could not be found)。

Kafka客戶端使用 Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()) 來加載Class,優先從當前線程的ContextClassLoader加載,與Presto的ThreadContextClassLoader有衝突,需要初化始KafkaProducer的時候,將ContextClassLoader暫時置為NULL。https://stackoverflow.com/a/50981469/1673775

5.3 圖數據庫JanusGraph

JanusGraph是一個開源的分布式圖數據庫。具有很好的擴展性,通過多機集群可支持存儲和查詢數百億的頂點和邊的圖數據。JanusGraph是一個事務數據庫,支持大量用戶高並發地執行複雜的實時圖遍歷。

生產上,存儲我們使用Cassandra,索引使用Elasticsearch,使用Gremlin查詢/遍歷語言來讀寫JanusGraph,有上手難度,熟悉Neo4j的Cypher語法可以使用cypher-for-gremlin plugin。

以下是數據血緣寫入圖數據庫的模型,Hive字段單獨為一個Lable,關係型DB字段為一個Label,關係分兩種,LABELWRITE,LABELWRITE_TTL。

只有輸入沒有輸出(Query查詢操作),只有輸出沒有輸入(建表等DDL操作)也會強制綁定一個來源系統的ID及擴展屬性。

在生產上使用JanusGraph,存儲億級的血緣關係,但是在開發過程中也遇到了一些性能問題。

寫入速度優化

以DB名+表名+字段名作為唯一key,實現getOrCreateVertex,並對vertex id緩存,加速頂點的加載速度。

關係批量刪除

關係LABELWRITETTL表示寫入的關係有存活時間(TTL-Time to live),這是因為在批量刪除關係的時候,JanusGraph速度相當慢,而且很容易OOM。比如要一次性刪除,Label為WRITE,x=y,寫入時間小於等於某個時間的邊,這時候Vertex和Edge load到內存中,容易OOM。

g.E().hasLabel("WRITE").has("x",eq("y")).has("publishedDate",P.lte(new Date(1610640000))).drop().iterate()

嘗試使用多線程+分批次的方式,即N個線程,每個線程刪除1000條,速度也不太可接受。

這時候採用了折中的方案,需要刪除關係用另外一種Label來表示,並在創建Label指定了TTL,由於Cassandra支持cell level TTL,所以邊的數據會自動被刪除。但是ES不支持TTL,實現一個定時刪除ES過期數據即可。

5.4 覆蓋範圍

Zeus調度平台 (ETL操作INSERT、CTAS,QUERY)

Ad-Hoc即席查詢平台 (CTAS,QUERY)

報表平台 (QUERY)

元數據平台 (DDL操作)

GPU平台 (PySpark)

通過ETL任務ID,查詢任務ID,報表ID,都可以獲取到輸入,輸出的表和字段的關係。

5.5 局限

使用MapReduce、Spark RDD讀寫HDFS的血緣暫時沒有實現。

思路可以在JobClient.submitJob的時候採集輸入和輸出路徑,又或者通過HDFS的AuditLog、CallerContext來關聯。

5.6 效果

在第一版使用圖的方式展示血緣關係,在上下游關係較多的時候,顯示較為混亂,第二版改成樹狀表格的方式展示。

字段operator在調度系統Zeus被轉換成hive_account,最後輸出是ArtNova報表系統的一張報表。

六、實際應用場景6.1 數據治理

通過血緣關係篩選,每天清理數千張未使用的臨時表,節約空間。

作為數據資產評估的依據,統計表、字段讀寫次數,生成的表無下游訪問,包括有沒有調度任務,報表任務,即席查詢。

6.2 元數據管理

統計一張表的生成時間,而不是統計整個任務的完成時間。

數據異常,或者下線一張表、一個字段的時候,可以找到相關的ETL任務或者報表任務,及時通知下游。

統計表的使用熱度,顯示趨勢。

6.3 調度系統

得益於在圖數據庫JanusGraph可以使用關係邊的key作為索引,可以根據任務ID可以輕鬆獲得該任務輸入和輸出表。

當配置一個任務A的依賴任務列表的時候,可以使用推薦依賴,檢查依賴功能,獲得任務A的所有輸入表,再通過輸入的表獲得寫入任務ID列表,即為任務A所需依賴的任務列表。

在任務結束後,獲取該任務所有輸出的表,進行預配的規則進行數據質量校驗。


6.4 敏感等級標籤

當源頭的數據來自生產DB時,生產DB有些列的標籤已打上了敏感等級,通過血緣關係,下游的表可以繼承敏感等級,自動打上敏感標籤。

七、總結

以上描述了攜程如何構建表和字段級別的血緣關係,及在實際應用的場景。

隨着業務需求和數據的增長,數據的加工流程越來越複雜,構建一套數據血緣,可以輕鬆查詢到數據之間的關係,進行表和字段級的血緣追溯,在元數據管理,數據治理,數據質量上承擔重要一環。

團隊招聘信息

我們是攜程集團的大數據平台研發團隊,主要負責攜程大數據平台的建設,包括但不限於Hadoop生態源碼二次開發,任務調度,查詢平台的開發,致力於為集團提供穩定、高效、易用的大數據存儲和計算服務,實現高效的資源調度,打造服務於所有業務的數據平台產品、服務與應用。

團隊懷有前瞻的技術視野,積極擁抱開源建設,緊跟業界技術趨勢,在這裡有濃厚的技術氛圍,你可以和團隊成員一同參與開源建設,深入探索和交流技術原理,也有技術實施的廣闊場景。

簡歷投遞郵箱:tech@trip.com,郵件標題:【姓名】-【攜程大數據平台】-【投遞職位方向】。

【推薦閱讀】

攜程平台化常態化數據治理之路用數據描述和驅動業務,攜程指標標準化管理實踐百萬QPS,秒級延遲,攜程基於實時流的大數據基礎層建設實時數據聚合怎麼破

「攜程技術」公眾號

分享,交流,成長



arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()