
1. 介紹
在Apache Hudi中,Hudi的一條數據使用HoodieRecord這個類表示,其中包含了hoodie的主鍵,record的分區文件位置,還有今天本文的關鍵,payload。payload是一個條數據的內容的抽象,決定了同一個主鍵的數據的增刪改查邏輯也決定了其序列化的方式。通過對payload的自定義,可以實現數據的靈活合併,數據的自定義編碼序列化等,豐富Hudi現有的語義,提升性能。
2. 場景
包括但不限於如下場景中,我們可以通過自定義payload來實現靈活的需求。
•實現同一個主鍵的數據非row level replace語義的合併,如mvcc語義等•實現同一個主鍵下多時間戳數據靈活排序的語義•實現輸出redo/undo log的效果•實現自定義序列化邏輯
3. 作用方式
首先我們回顧一下一條HoodieRecord在Spark環境中使用RDD API upsert寫入MOR表的生命周期。

注意:在這個過程中,shuffle/寫入文件/磁盤spill的時候,都需要保證數據是已經被序列化過的格式。
4. 實現方式
在Hudi中,默認的payload實現是DefaultHoodieRecordPayload,它是OverwriteWithLatestAvroPayload子類。而OverwriteWithLatestAvroPayload這個類繼承了BaseAvroPayload並implementsHoodieRecordPayload這個接口。
其中BaseAvroPayload決定了數據的序列化方式,而HoodieRecordPayload決定了數據的合併方式。後者是必須使用的,但是前者不是。下面來分別分析他們的實現。
BaseAvroPayload/** * Base class for all AVRO record based payloads, that can be ordered based on a field. */public abstract class BaseAvroPayload implements Serializable { /** * Avro data extracted from the source converted to bytes. */ public final byte[] recordBytes; /** * For purposes of preCombining. */ public final Comparable orderingVal; /** * Instantiate {@link BaseAvroPayload}. * * @param record Generic record for the payload. * @param orderingVal {@link Comparable} to be used in pre combine. */ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; this.orderingVal = orderingVal; if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); } }}
首先BaseAvroPayloadimplements了Serializable接口,標誌着這個類和它的子類都是為了序列化而設計的,大家在繼承的時候需要注意子類相關attribute的可序列化問題。
構造器傳入了GenericRecord和一個Comparable的變量。由於Hudi使用avro作為內部的行存序列化格式,所以輸入的數據需要以GenericRecord的形式傳遞給payload。BaseAvroPayload會將數據直接序列化成binary待IO使用。這裡的假設是我們只需要做row level操作,直接操作整行的二進制數據毫無疑問是非常高效的,這裡的orderingVal是因為基於行級別的record比較在RDBMS的CDC中是非常常見的,所以增加了這個字段。這樣處理之後,只需保證comparable的變量也是可序列化的,這個類的所有attribute都已經是可序列化的格式了,使用任意序列化框架直接傳輸即可。
HoodieRecordPayload/** * Every Hoodie table has an implementation of the <code>HoodieRecordPayload</code> This abstracts out callbacks which depend on record specific logic. */@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable { /** * This method is deprecated. Please use this {@link #preCombine(HoodieRecordPayload, Properties)} method. */ @Deprecated @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) T preCombine(T oldValue); /** * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map. * Implementation can leverage the property to decide their business logic to do preCombine. * * @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with. * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. * * @return the combined value */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default T preCombine(T oldValue, Properties properties) { return preCombine(oldValue); } /** * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs. */ @Deprecated @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException; /** * This methods lets you write custom merging/combining logic to produce new values as a function of current value on storage and whats contained * in this object. Implementations can leverage properties if required. * <p> * eg: * 1) You are updating counters, you may want to add counts to currentValue and write back updated counts * 2) You may be reading DB redo logs, and merge them with current image for a database row on storage * </p> * * @param currentValue Current value in storage, to merge/combine this payload with * @param schema Schema used for record * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record. */ default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { return combineAndGetUpdateValue(currentValue, schema); } /** * This method is deprecated. Refer to {@link #getInsertValue(Schema, Properties)} for java docs. * @param schema Schema used for record * @return the {@link IndexedRecord} to be inserted. */ @Deprecated @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) Option<IndexedRecord> getInsertValue(Schema schema) throws IOException; /** * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a new value for the given * HoodieKey, wherein there is no existing record in storage to be combined against. (i.e insert) Return EMPTY to skip writing this record. * Implementations can leverage properties if required. * @param schema Schema used for record * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. * @return the {@link IndexedRecord} to be inserted. */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException { return getInsertValue(schema); } /** * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to {@code WriteStatus.markSuccess()} and * {@code WriteStatus.markFailure()} in order to compute some aggregate metrics using the metadata in the context of a write success or failure. * @return the metadata in the form of Map<String, String> if any. */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default Option<Map<String, String>> getMetadata() { return Option.empty(); }}
這個類的注釋寫得非常清楚,其中每個方法都有定義兩個個不同接口,截止本文發出時候(0.9.0版本),部分內部邏輯還在使用deprecated的舊版本,所以在使用時需要注意,邏輯最好放在舊接口裡。
簡單來說,preCombine這個方法定義了兩個payload合併的邏輯,在兩個場景下會被調用:
1.當deduplicated 開啟時,寫入的數據兩兩合併時用到2.在MOR表發生compaction時,兩條從log中讀取的payload合併時用到3.MOR表使用RT視圖讀取時
而combineAndGetUpdateValue則定義了寫入數據和baseFile中的數據(這裡已經被轉化成avro的行存格式)的合併方式。通常情況下,這合併邏輯應該和preCombine保持語義上的一致。
最後getInsertValue則定義了如何將數據從payload形式轉化成GenericRecord。在Hoodie相關的WriteHandle中被大量使用。通常是被用在寫入Log/BaseFile時調用的。
幾點額外注意的是:
1.combineAndGetUpdateValue和getInsertValue返回的都是Option,在這裡,如果返回Option.empty(),就是指數據刪除的意思。EmptyHoodieRecordPayload正是這一邏輯的payload表達,如果preCombine的返回結果是刪除,則可以返回這個類的實例。而hoodie中,在insert和upsert中通過添加_hoodie_is_deleted字段來實現刪除的原理,本質上也是在payload中判斷到這個字段,就返回空來實現的。2.不論是否繼承BaseAvroPayload這個類/是否需要Comparable類型的orderingVal, 最好保留(GenericRecord, Comparable)這個構造器,因為Hudi中存在反射調用創建對象,默認尋找的構造器就是這個。
5. 使用場景5. 1 Column Level的數據合併
有時候我們希望能夠實現兩個數據合併時,能夠按照每個列的實現不同的合併邏輯。這時候就可以在preCombine和combineAndGetUpdateValue方法中藉助schema遍歷所有列,然後做不同的處理。如果需要在preCombine中使用Schema,可以在構造器初始化的時候保留GenericRecord中schema的引用。如果發生序列化後的傳輸,同時又沒有使用schema可以序列化的版本(avro 1.8.2中 schema是不可序列化的對象),那麼可以從方法中傳遞的properties中傳遞的信息構建schema。
public HoodieRecordPayload preCombine(HoodieRecordPayload oldValue, Properties properties) { if (schema == null) { this.schema = new Schema.Parser().parse(properties.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key())); } initialSchema(properties); GenericRecord thisRecord = getInsertValue(schema).get(); GenericRecord otherRecord = oldValue.getInsertValue(schema).get(); List<Schema.Field> fields = schema.getFields(); for (Schema.Field field : fields) { // logic for each column } return new HoodieRecordPayload(thisRecord, orderingVal);}5.2 實現自定義的序列化方式
在默認的BaseAvroPayload中,一次upsert,一條數據通常最少要序列化/反序列化三次,第一次是創建payload的時候,第二次是在寫入時反序列化,第三次是寫入文件時序列化。如果數據非常複雜,序列化其實是非常耗時的。我們可以通過靈活定義payload來決定序列化的方式,減少觸發正反序列化的次數。這個技巧在Compaction的時候也可以獲得收益。如考慮如下場景:
對於一條kakfa的數據,我們可以把key和partition相關的內容存在kafka的key/timestamp中。然後使用binary的方式獲取kafka的value。通過kafka的key來構建HoodieRecordKey,然後將value直接以二進制方式存在payload中的map/list中,這樣不會觸發任何關於數據的序列化,額外的開銷很低。而後將合併的邏輯放在getInsertValue方法中,在從payload轉換成GenericRecord時,才將binary進行同一個key的數據合併和數據,這樣只需要一次avro的序列化操作就可以完成寫入過程。
需要注意的是,這樣的設計方式毫無疑問增加了複雜度,使業務邏輯抽象方式變難,同時因為avro的序列化壓縮比例通常比較高,如果直接傳輸業務數據,可能會有更大的IO和內存占用,需要根據場景評估收益。
6. 總結
本篇文章中我們介紹了Apache Hudi的關鍵數據抽象payload邏輯,同時介紹了幾種關鍵payload的實現,最後給出基於payload的幾種典型應用場景。
推薦閱讀
Apache Hudi 在 B 站構建實時數據湖的實踐
Apache Hudi在華米科技的應用-湖倉一體化改造
基於Hudi的流式CDC實踐一:聽說你準備了面試題?
如何將數據更快導入Apache Hudi?
基於Apache Hudi 的CDC數據入湖