close

▼ 關注「Flink 中文社區」,獲取更多技術乾貨▼


摘要:本文詳細介紹了 Flink + Hudi 湖倉一體化方案的原型構建。主要內容為:

Hudi
新架構與湖倉一體
最佳實踐
Flink on Hudi
Flink CDC 2.0 on Hudi

Tips:FFA 2021 重磅開啟,點擊「閱讀原文」即可報名~

GitHub 地址
歡迎大家給Flink點讚送 star~

一、Hudi

1. 簡介

Apache Hudi (發音為 「Hoodie」)在 DFS 的數據集上提供以下流原語:

插入更新 (如何改變數據集?)

增量拉取 (如何獲取變更的數據?)

Hudi 維護在數據集上執行的所有操作的時間軸 (timeline),以提供數據集的即時視圖。Hudi 將數據集組織到與 Hive 表非常相似的基本路徑下的目錄結構中。數據集分為多個分區,文件夾包含該分區的文件。每個分區均由相對於基本路徑的分區路徑唯一標識。

分區記錄會被分配到多個文件。每個文件都有一個唯一的文件 ID 和生成該文件的提交 (commit)。如果有更新,則多個文件共享相同的文件 ID,但寫入時的提交 (commit) 不同。

存儲類型 – 處理數據的存儲方式

寫時複製

純列式

創建新版本的文件

讀時合併

近實時

視圖 – 處理數據的讀取方式

讀取優化視圖 - 輸入格式僅選擇壓縮的列式文件

parquet 文件查詢性能

500GB 的延遲時間約為 30 分鐘

導入現有的 Hive 表

近實時視圖

混合、格式化數據

約 1-5 分鐘的延遲

提供近實時表

增量視圖

數據集的變更

啟用增量拉取

Hudi 存儲層由三個不同的部分組成:

元數據 – 它以時間軸的形式維護了在數據集上執行的所有操作的元數據,該時間軸允許將數據集的即時視圖存儲在基本路徑的元數據目錄下。時間軸上的操作類型包括:


提交 (commit),一次提交表示將一批記錄原子寫入數據集中的過程。單調遞增的時間戳,提交表示寫操作的開始。
清理 (clean),清理數據集中不再被查詢中使用的文件的較舊版本。
壓縮 (compaction),將行式文件轉化為列式文件的動作。

索引 - 將傳入的記錄鍵快速映射到文件 (如果已存在記錄鍵)。索引實現是可插拔的,Bloom 過濾器 - 由於不依賴任何外部系統,因此它是默認配置,索引和數據始終保持一致。Apache HBase - 對少量 key 更高效。在索引標記過程中可能會節省幾秒鐘。


數據 - Hudi 以兩種不同的存儲格式存儲數據。實際使用的格式是可插入的,但要求具有以下特徵 – 讀優化的列存儲格式 (ROFormat),默認值為 Apache Parquet;寫優化的基於行的存儲格式 (WOFormat),默認值為 Apache Avro。



2. 為什麼 Hudi 對於大規模和近實時應用很重要?

Hudi 解決了以下限制:

HDFS 的可伸縮性限制;


需要在 Hadoop 中更快地呈現數據;


沒有直接支持對現有數據的更新和刪除;


快速的 ETL 和建模;


要檢索所有更新的記錄,無論這些更新是添加到最近日期分區的新記錄還是對舊數據的更新,Hudi 都允許用戶使用最後一個檢查點時間戳。此過程不用執行掃描整個源表的查詢。


3. Hudi的優勢

HDFS 中的可伸縮性限制;


Hadoop 中數據的快速呈現;


支持對於現有數據的更新和刪除;


快速的 ETL 和建模。


以上內容主要引用於:《Apache Hudi 詳解》

二、新架構與湖倉一體


通過湖倉一體、流批一體,准實時場景下做到了:數據同源、同計算引擎、同存儲、同計算口徑。數據的時效性可以到分鐘級,能很好的滿足業務准實時數倉的需求。下面是架構圖:


MySQL 數據通過 Flink CDC 進入到 Kafka。之所以數據先入 Kafka 而不是直接入 Hudi,是為了實現多個實時任務復用 MySQL 過來的數據,避免多個任務通過 Flink CDC 接 MySQL 表以及 Binlog,對 MySQL 庫的性能造成影響。

通過 CDC 進入到 Kafka 的數據除了落一份到離線數據倉庫的 ODS 層之外,會同時按照實時數據倉庫的鏈路,從 ODS->DWD->DWS->OLAP 數據庫,最後供報表等數據服務使用。實時數倉的每一層結果數據會准實時的落一份到離線數倉,通過這種方式做到程序一次開發、指標口徑統一,數據統一。

從架構圖上,可以看到有一步數據修正 (重跑歷史數據) 的動作,之所以有這一步是考慮到:有可能存在由於口徑調整或者前一天的實時任務計算結果錯誤,導致重跑歷史數據的情況。

而存儲在 Kafka 的數據有失效時間,不會存太久的歷史數據,重跑很久的歷史數據無法從 Kafka 中獲取歷史源數據。再者,如果把大量的歷史數據再一次推到 Kafka,走實時計算的鏈路來修正歷史數據,可能會影響當天的實時作業。所以針對重跑歷史數據,會通過數據修正這一步來處理。

總體上說,這個架構屬於 Lambda 和 Kappa 混搭的架構。流批一體數據倉庫的各個數據鏈路有數據質量校驗的流程。第二天對前一天的數據進行對賬,如果前一天實時計算的數據無異常,則不需要修正數據,Kappa 架構已經足夠。

本節內容引用自:37 手遊基於 Flink CDC + Hudi 湖倉一體方案實踐

三、最佳實踐

1. 版本搭配

版本選擇,這個問題可能會成為困擾大家的第一個絆腳石,下面是hudi中文社區推薦的版本適配:

FlinkHudi1.12.20.9.01.13.10.10.0

建議用 Hudi master + Flink 1.13 這樣可以和 CDC connector 更好地適配。

2. 下載Hudi
https://mvnrepository.com/artifact/org.apache.Hudi/Hudi-Flink-bundle

目前 maven 中央倉庫,最新版本是 0.9.0 ,如果需要下載 0.10.0 版本 , 可以加入社區群,在共享文件中下載,也可以下載源碼自行編譯。

3. 執行
如果將 Hudi-Flink-bundle_2.11-0.10.0.jar 放到了 Flink/lib 下,則只需要如下執行即可,否則會出現各種找不到類的異常

bin/SQL-client.sh embedded

四、Flink on Hudi


新建 maven 工程,修改 pom 如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Flink_Hudi_test</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <Flink.version>1.13.1</Flink.version> <Hudi.version>0.10.0</Hudi.version> <hadoop.version>2.10.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-core</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-streaming-java_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-jdbc_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-clients_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-table-api-java-bridge_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-table-common</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-table-planner_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-table-planner-blink_2.11</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-table-planner-blink_2.11</artifactId> <version>${Flink.version}</version> <type>test-jar</type> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>Flink-connector-mySQL-CDC</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.Hudi</groupId> <artifactId>Hudi-Flink-bundle_2.11</artifactId> <version>${Hudi.version}</version> <scope>system</scope> <systemPath>${project.basedir}/libs/Hudi-Flink-bundle_2.11-0.10.0-SNAPSHOT.jar</systemPath> </dependency> <dependency> <groupId>mySQL</groupId> <artifactId>mySQL-connector-java</artifactId> <version>5.1.49</version> </dependency> </dependencies></project>

我們通過構建查詢insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mySQL_binlog 將創建的 MySQL 表,插入到 Hudi 里。


package name.lijiaqi;import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.Flink.table.api.EnvironmentSettings;import org.apache.Flink.table.api.SQLDialect;import org.apache.Flink.table.api.TableResult;import org.apache.Flink.table.api.bridge.java.StreamTableEnvironment;public class MySQLToHudiExample { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); tableEnv.getConfig().setSQLDialect(SQLDialect.DEFAULT); // 數據源表 String sourceDDL = "CREATE TABLE mySQL_binlog (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mySQL://127.0.0.1:3306/test', \n"+ " 'driver' = 'com.mySQL.jdbc.Driver', \n"+ " 'username' = 'root',\n" + " 'password' = 'dafei1288', \n" + " 'table-name' = 'test_CDC'\n" + ")"; // 輸出目標表 String sinkDDL = "CREATE TABLE t2(\n" + "\tuuid VARCHAR(20),\n"+ "\tid INT NOT NULL,\n" + "\tname VARCHAR(40),\n" + "\tdescription VARCHAR(40),\n" + "\tts TIMESTAMP(3)\n"+// "\t`partition` VARCHAR(20)\n" + ")\n" +// "PARTITIONED BY (`partition`)\n" + "WITH (\n" + "\t'connector' = 'Hudi',\n" + "\t'path' = 'hdfs://172.19.28.4:9000/Hudi_t4/',\n" + "\t'table.type' = 'MERGE_ON_READ'\n" + ")" ; // 簡單的聚合處理 String transformSQL = "insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mySQL_binlog"; tableEnv.executeSQL(sourceDDL); tableEnv.executeSQL(sinkDDL); TableResult result = tableEnv.executeSQL(transformSQL); result.print(); env.execute("mySQL-to-Hudi"); }}

查詢 Hudi

package name.lijiaqi;import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.Flink.table.api.EnvironmentSettings;import org.apache.Flink.table.api.SQLDialect;import org.apache.Flink.table.api.TableResult;import org.apache.Flink.table.api.bridge.java.StreamTableEnvironment;public class ReadHudi { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); tableEnv.getConfig().setSQLDialect(SQLDialect.DEFAULT); String sourceDDL = "CREATE TABLE t2(\n" + "\tuuid VARCHAR(20),\n"+ "\tid INT NOT NULL,\n" + "\tname VARCHAR(40),\n" + "\tdescription VARCHAR(40),\n" + "\tts TIMESTAMP(3)\n"+// "\t`partition` VARCHAR(20)\n" + ")\n" +// "PARTITIONED BY (`partition`)\n" + "WITH (\n" + "\t'connector' = 'Hudi',\n" + "\t'path' = 'hdfs://172.19.28.4:9000/Hudi_t4/',\n" + "\t'table.type' = 'MERGE_ON_READ'\n" + ")" ; tableEnv.executeSQL(sourceDDL); TableResult result2 = tableEnv.executeSQL("select * from t2"); result2.print(); env.execute("read_Hudi"); }}

展示結果


五、Flink CDC 2.0 on Hudi


上一章節,我們使用代碼形式構建實驗,在本章節里,我們直接使用官網下載的 Flink 包來構建實驗環境。

1. 添加依賴

添加如下依賴到 $Flink_HOME/lib 下:

Hudi-Flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然後構建)


hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)


Flink-SQL-connector-mySQL-CDC-2.0.0.jar


Flink-format-changelog-json-2.0.0.jar


Flink-SQL-connector-Kafka_2.11-1.13.2.jar


注意,在尋找 jar 的時候,CDC 2.0 更新過 group id ,不再試com.alibaba.ververica 而是改成了 com.ververica



2. Flink SQL CDC on Hudi

創建 MySQL CDC 表

CREATE TABLE mySQL_users ( id BIGINT PRIMARY KEY NOT ENFORCED , name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3)) WITH ( 'connector' = 'mySQL-CDC', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'dafei1288', 'server-time-zone' = 'Asia/Shanghai', 'database-name' = 'test', 'table-name' = 'users' );

創建 Hudi 表

CREATE TABLE Hudi_users5( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3), `partition` VARCHAR(20)) PARTITIONED BY (`partition`) WITH ( 'connector' = 'Hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 'hdfs://localhost:9009/Hudi/Hudi_users5');

修改配置,讓查詢模式輸出為表,設置 checkpoint

set execution.result-mode=tableau;set execution.checkpointing.interval=10sec;

進行輸入導入

INSERT INTO Hudi_users5(id,name,birthday,ts, `partition`) SELECT id,name,birthday,ts,DATE_FORMAT(birthday, 'yyyyMMdd') FROM mySQL_users;

查詢數據

select * from Hudi_users5;

執行結果


3. 卡執行計劃


這個問題研究了很久,表面上很正常,日誌也沒有任何報錯,也可以看出來 CDC 起作用了,有數據寫入,但是就是卡在 hoodie_stream_write 上一動不動,沒有數據下發。感謝社區大佬 Danny Chan 的提點,可能是 checkpoint的問題,於是做了設置:

set execution.checkpointing.interval=10sec;

終於正常:


至此,Flink + Hudi 湖倉一體化方案的原型構建完成。

參考鏈接

[1] https://blog.csdn.net/qq_37095882/article/details/103714548
[2] https://blog.csdn.net/weixin_49218925/article/details/115511022

FlinkForwardAsia 2021
報名現已開放

Flink Forward Asia 2021 重磅啟動!FFA 2021 將於 12 月 4-5 日在北京·國家會議中心舉辦,預計將有 3000+ 開發者參與,探討交流 Flink 最新動態。報名通道已開啟,掃描下圖二維碼,或點擊文末「閱讀原文」即可報名 FFA 2021~


更多 Flink 相關技術問題,可掃碼加入社區釘釘交流群~

戳我,報名 FFA 2021 大會!

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()