作者簡介盛宇帆,StreamNative 開發工程師,Apache Pulsar 與 Apache Flink 貢獻者。加入 StreamNative 之前,他曾就職於阿里大數據平台和騰訊雲負責 Flink 開發工作。盛宇帆是騰訊雲項目 Barad 的核心 committer 與項目落地負責人。他目前在 StreamNative 負責 Pulsar-Flink 和 Pulsar-Spark 相關的開發工作,他和他的團隊已經將 Pulsar Source Connector 貢獻給 Flink 社區,並於 Apache Flink 1.14.0 發布,並將在後續的發布中完整地將 Pulsar Connector 貢獻給社區。
編輯:Jipei@StreamNative,Apache Pulsar 貢獻者。
本文摘要
•批流一體是數據計算的未來趨勢,Pulsar Flink Connector 為基於 Apache Pulsar 在 Apache Flink 上以批流一體的方式處理數據提供了理想的解決方案。•StreamNative 已將 Pulsar Source Connector 貢獻至 Flink 1.14.0 版本。用戶可以使用它從 Pulsar 讀取數據,並保證每條數據只被處理一次。•最新 Pulsar Flink Connector 基於 Pulsar 2.8.0 和 Flink 1.14 版本,支持 Pulsar 的事務處理,進一步融合了兩者的特性。
背景
隨着數據日益膨脹,採用事件流處理數據至關重要。Apache Flink 將批流處理統一到計算引擎中,提供了一致化的編程接口。Apache Pulsar(與 Apache BookKeeper 一起)以 "流 "的方式統一數據。在 Pulsar 中,數據存儲成一個副本,以流(streaming)(通過 pub-sub 接口)和 segment(用於批處理)的方式進行訪問。Pulsar 解決了企業在使用不同的存儲和消息技術解決方案時遇到的數據孤島問題。
Flink 可以直接與 Pulsar broker 進行實時的流式讀寫,同時 Flink 也可以批量讀取 Pulsar 底層離線存儲,與 BookKeeper 的內容進行批次讀寫。同時支持批流,使得 Pulsar 和 Flink 先天就是契合的夥伴。把 Flink 和 Pulsar 結合使用,這兩種開源技術可以創建一個統一的數據架構,為實時數據驅動企業提供最佳解決方案。
為了將 Pulsar 與 Flink 的功能進行整合,為用戶提供更強大的開發能力,StreamNative 開發並開源了 Pulsar Flink Connector。經過多次的打磨,Pulsar Flink Connector 已合併進 Flink 代碼倉庫,並在 Flink 1.14.0 版本中發布!
Pulsar Flink Connector 基於 Apache Pulsar 和 Apache Flink 提供彈性數據處理,允許 Apache Flink 讀寫 Apache Pulsar 中的數據。使用 Pulsar Flink Connector,企業能夠更專注於業務邏輯,無需關注存儲問題。
打造全新的 Pulsar Flink Connector
在此版本之前,StreamNative 已發布Pulsar Flink Connector 2.7 版本。為什麼要推翻之前的代碼,重新打造批流融合呢?在新版本中進行了哪些重構呢?
新版本改動拆分設計
所有的數據消費都是基於 split(分流) 創建 Reader 去消費數據。如何將 Pulsar 消息抽象為 split?首先我們對 topic 進行抽象,針對每一個分區創建 Partition 示例。對有分區的 topic 就按數量創建,而對無分區的 topic 只有 1 個 partition,其值為 -1。
在 Pulsar 的 exclusive(獨占)、shared(共享)和 failover(災備)訂閱模式中,我們將 topic partition 包裝為在 Flink 上消費的 split,其中包含消費節點、存儲節點和兩個特殊的狀態,最後消費的消息 ID 和當前處理的事務 ID 分別用於 Pulsar 的不同模式。在 Pulsar 的 key_shared (鍵共享)模式中,在 topic partition 和 split 間映射的時候增加了 range 層。
針對每個分區創建 split 的原因在於:
•Pulsar 的分區實際也是 topic;•Topic 分區實際是子 topic;•僅可在單一 topic 上執行Consumer.seek()。
枚舉器(enumerator)設計
枚舉器對應 split 分發和訂閱的接口。這個設計注意分成兩個部分,一部分是基於 TopicList,對於用戶給定的一組 topic,從 Pulsar 進行信息查詢;另一部分是 Topic Pattern,查詢當前 topic、正則匹配並創建 split。
在 exclusive(獨占)、key_shared(鍵共享)和 failover(災備)模式中,一個 split 只會被以輪循的方式分配給一個 reader。
在 shared(共享)模式中,每個 split 會分給每個 reader,在此模式中,每個 reader 會消費 Pulsar 的每個 partition。
Reader 設計
在 exclusive(獨占)和 failover(災備)模式中,Reader 設計如下:
我們可以看到這個 topic 當前有三個分區,在 enumerator 這一層根據分區創建 3 個 split,Flink 的並行度為 3,產生 Reader 0、1、2 三個 reader 分別消費 split,由此形成獨占的消費模式。Failover 模式和獨占模式是一樣的消費模型,二者都是順序消費。
在 Pulsar 的 Shared 和 Key_shared 模式下,消費是無序的。我們既不希望它順序消費,也不希望一條條地 ACK。於是我們在這裡引入事務(transaction),每創建一條消息就開啟一個事務,在事務內進行 ACK,事務 ACK 會在 checkpoint 上進行提交。
類型系統
Pulsar 同 Flink 類似,都有類型系統。
Flink 的類型系統:
•DeserializationSchema:對原始數據進行解碼;•TypeInformation:Flink 每個 strength?之間基於 TypeInformation 進行數據序列化而傳輸;•TypeSerializer:TypeInformation 創建的序列化實例。
在 Pulsar 中:
•Schema:Pulsar Schema 是 Client 端數據序列化和反序列化的接口;•SchemaInfo:接口創建 SchemaInfo 傳輸給 Broker,broker 根據 SchemaInfo 進行 Schema 版本的兼容和 Schema 是否能夠升級的校驗。SchemaInfo 使 broker 不需要進行序列化和反序列化;•SchemaDefinition:給 Client 創建 Schema 所需的實例。
因此 Pulsar 和 Flink 在類型系統上進行打通,就產生了以下兩種模式:
•常見模式:Reader 以 Byte 數據的形式進行消費,用 Flink 的 DeserializationSchema 進行解析,DeserializationSchema 自帶的 TypeInformation 向下游傳遞。Flink 和其他消息系統也是用這種模式。
•Pulsar 獨有的模式:Reader 以 Byte 數據的形式進行消費,在 Flink 上以 Pulsar Schema 將數據進行解碼,並自動創建能在 Flink 上使用的 TypeInformation。
但是在第二種模式中沒有用到 Pulsar 自帶的 Schema 兼容和校驗,在下個版本中我們將用到這個特性。
版本要求
Flink 當前只提供 Pulsar Source connector,用戶可以使用它從 Pulsar 讀取數據,並保證每條數據只被處理一次。
連接器當前支持 Pulsar 2.7.0 之後的版本,但是連接器使用到了 Pulsar 的事務機制[1],建議在 Pulsar 2.8.0 及其之後的版本上使用連接器進行數據讀取。更多關於 Pulsar API 兼容性設計可閱讀PIP-72[2]。
閱讀文檔[3],了解如何將連接器添加到 Flink 集群實例內。
使用 Flink 1.14.0 的 Pulsar Source Connector
新版本的 Pulsar Source Connector 已被合併進 Flink 最新發布的 1.14.0 版本。如果要想使用基於舊版的SourceFunction實現的 Pulsar Source Connector,或者是使用的 Flink 版本低於 1.14,可以使用 StreamNative 單獨維護的pulsar-flink[4]。
構造 Pulsar Source Connector 實例
Pulsar Source Connector 提供了 builder 類來構造 Source Connector 實例。下面的代碼實例使用 builder 類創建的 Source Connector 會從 topic 「persistent://public/default/my-topic」 的數據開始端進行消費。連接器使用了 Exclusive(獨占)的訂閱方式消費消息,訂閱名稱為my-subscription,並把消息體的二進制字節流以 UTF-8 的方式編碼為字符串。
如果使用構造類構造 Pulsar Source Connector ,一定要提供下面幾個屬性:
•Pulsar 數據消費的地址,使用setServiceUrl(String)方法提供;•Pulsar HTTP 管理地址,使用setAdminUrl(String)方法提供;•Pulsar 訂閱名稱,使用setSubscriptionName(String)方法提供;•需要消費的 topic 或者是 topic 下面的分區,詳見指定消費的 Topic 或者 Topic 分區[5];•解碼 Pulsar 消息的反序列化器,詳見反序列化器[6]。
指定消費的 Topic/Topic 分區
Pulsar Source Connector 提供了兩種訂閱 topic 或 topic 分區的方式:
•Topic 列表,從這個 Topic 的所有分區上消費消息,例如:
•Topic 正則,連接器使用給定的正則表達式匹配出所有合規的 topic,例如:
Topic 名稱簡寫
從 Pulsar 2.0 之後,完整的 topic 名稱格式為{persistent|non-persistent}://租戶/命名空間/topic。但是連接器不需要提供 topic 名稱的完整定義,因為 topic 類型、租戶、命名空間都設置了默認值。
Topic 屬性默認值topic 類型persistent租戶public命名空間default
當前支持的簡寫方式:
topic 名稱簡寫翻譯後的 topic 名稱my-topicpersistent://public/default/my-topicmy-tenant/my-namespace/mu-topicpersistent://my-tenant/my-namespace/my-topic
⚠️注意:對於 non-persistent(非持久化) topic,連接器不支持簡寫名稱,non-persistent://public/default/my-topic不可簡寫成non-persistent://my-topic。
訂閱分區結構的 Topic
對於 Pulsar 而言,Topic 分區也是一種 Topic。Pulsar 會將一個有分區的 Topic 在內部按照分區的大小拆分成等量的無分區 Topic。例如,在 Pulsar 的sample租戶下面的flink命名空間裡面創建了一個有 3 個分區的 topic,給它起名為simple-string。可以在 Pulsar 上看到如下的 topic 列表:
這意味着,用戶可以用上面的子 topic 去直接消費分區裡面的數據,不需要再去基於上層的父 topic 去消費全部分區的數據。例如:使用PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") 將會只消費 topicsample/flink/simple-string上面的分區 1 和 2 裡面的消息。
配置 Topic 正則表達式
前面提到了 Pulsar topic 有persistent、non-persistent兩種類型,使用正則表達式消費數據的時候,連接器會嘗試從正則表達式裡面解析出消息的類型。例如:PulsarSource.builder().setTopicPattern("non-persistent://my-topic*") 會解析出non-persistenttopic 類型。如果用戶使用 topic 名稱簡寫的方式,連接器會使用默認的消息類型persistent。
如果想用正則去消費persistent和non-persistent類型的 topic,需要使用RegexSubscriptionMode定義 topic 類型,例如:setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`。
解析消息——反序列化器
反序列化器用於解析 Pulsar 消息,連接器使用PulsarDeserializationSchema來定義反序列化器。用戶可以在 builder 類中使用setDeserializationSchema(PulsarDeserializationSchema)方法配置反序列化器,它會解析 Pulsar 的Message<byte[]>實例。
如果用戶只關心消息體的二進制字節流,並不需要其他屬性來解析數據。可以直接使用預定義的PulsarDeserializationSchema。Pulsar 連接器裡面提供了 3 種預定義好的反序列化器:
•使用 Pulsar 的Schema[7]解析消息。
•使用 Flink 的DeserializationSchema解析消息。
•使用 Flink 的 TypeInformation 解析消息。
Pulsar 的Message<byte[]>包含了很多額外的屬性[8]。例如,消息的 key、消息發送時間、消息生產時間、用戶在消息上自定義的鍵值對屬性等。可以使用Message<byte[]>接口來獲取這些屬性。
如果用戶需要基於這些額外的屬性來解析一條消息,可以實現PulsarDeserializationSchema接口, 並一定要確保PulsarDeserializationSchema.getProducedType()方法返回的TypeInformation是正確的結果。Flink 使用TypeInformation將解析出來的結果序列化傳遞到下游算子。
訂閱模式
Pulsar 共支持四種訂閱模式:exclusive(獨占)[9]、shared(共享)[10]、failover(災備)[11]、key_shared(key 共享)[12]。當前 Pulsar 連接器裡面,獨占和災備的實現沒有區別,如果 Flink 的一個 reader 掛了,連接器會把所有未消費的數據交給其他的 reader 來消費數據。默認情況下,如果沒有指定訂閱類型,連接器使用共享訂閱類型(SubscriptionType.Shared)。
如果想在 Pulsar 連接器裡面使用key 共享訂閱,需要提供RangeGenerator實例。RangeGenerator會生成一組消息 key 的 hash 範圍,連接器會基於給定的範圍來消費數據。Pulsar 連接器也提供了一個名為UniformRangeGenerator的默認實現,它會基於 flink Source Connector 的並行度將 hash 範圍均分。
起始消費位置
連接器使用setStartCursor(StartCursor)方法給定開始消費的位置。內置的消費位置有:
•從 topic 裡面最早的一條消息開始消費。
•從 topic 裡面最新的一條消息開始消費。
•從給定的消息開始消費。
•與前者不同的是,給定的消息可以跳過,再進行消費。
•從給定的消息時間開始消費。
每條消息都有一個固定的序列號,這個序列號在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用於在 Pulsar 底層存儲上查找到具體的消息。Pulsar 稱這個序列號為 MessageId,用戶可以使用DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)創建它。
邊界
Pulsar 連接器同時支持流式和批的消費方式,默認情況下,連接器使用流的方式消費數據。除非任務失敗或者被取消,否則連接器將持續消費數據。用戶可以使用setBoundedStopCursor(StopCursor) 給定停止消費的位置,這種情況下連接器會使用批的方式進行消費。當所有 topic 分區都消費到了停止位置,Flink 任務就會結束。使用流的方式一樣可以給定停止位置,使用setUnboundedStopCursor(StopCursor)方法即可。內置的停止位置如下:
•永不停止。
•停止於 Pulsar 啟動時 topic 裡面最新的那條數據。
•停止於某條消息,結果里不包含此消息。
•停止於某條消息之後,結果里包含此消息。
•停止於某個給定的消息時間戳。
其他配置項
除了前面提到的配置選項,連接器還提供了豐富的選項供 Pulsar 專家使用,在 builder 類里通過setConfig(ConfigOption<T>, T)和setConfig(Configuration)方法給定 Pulsar 客戶端、Pulsar API 的全部配置。具體參考其他配置項[13]
動態分區發現
為了能在啟動 Flink 任務之後還能發現在 Pulsar 上擴容的分區或者是新創建的 topic,連接器提供了動態分區發現機制。該機制不需要重啟 Flink 任務。對選項PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS設置一個正整數即可啟用。
默認情況下,Pulsar 啟用動態分區發現,查詢間隔為 30 秒。用戶可以給定一個負數,將該功能禁用。如果使用批的方式消費數據,將無法啟用該功能。
事件時間與 watermark
默認情況下,連接器使用 PulsarMessage<byte[]>裡面的時間作為解析結果的時間戳。用戶可以使用WatermarkStrategy來自行解析出想要的消息時間,並向下游傳遞對應的水位線。
定義WatermarkStrategy參考文檔[14]。
消息確認
一旦在 topic 上創建了訂閱,消息便會存儲在 Pulsar 里。即使沒有消費者,消息也不會被丟棄。只有當連接器同 Pulsar 確認此條消息已經被消費,該消息才以某種機制會被移除。連接器支持四種訂閱方式,它們的消息確認方式也大不相同。
獨占和災備訂閱
獨占和災備訂閱下,連接器使用累進式確認方式。確認某條消息已經被處理時,其前面被消費的消息會自動被置為已讀。Pulsar 連接器會在 Flink 完成檢查點時將對應時刻消費的消息置為已讀,以此來保證 Pulsar 狀態與 Flink 狀態一致。如果用戶沒有在 Flink 上啟用檢查點,連接器可以使用周期性提交來將消費狀態提交給 Pulsar,使用配置PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL來進行定義。
需要注意的是,此種場景下,Pulsar 連接器並不依賴於提交到 Pulsar 的狀態來做容錯。消息確認只是為了能在 Pulsar 端看到對應的消費處理情況。
共享和 key 共享訂閱
共享和key 共享需要依次確認每一條消息,所以連接器在 Pulsar 事務裡面進行消息確認,然後將事務提交到 Pulsar。首先需要在 Pulsar 的borker.conf文件裡面啟用事務:
連接器創建的事務的默認超時時間為 3 小時,請確保這個時間大於 Flink 檢查點的間隔。用戶可以使用PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS來設置事務的超時時間。
如果用戶無法啟用 Pulsar 的事務,或者是因為項目禁用了檢查點,需要將PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE選項設置為true,消息從 Pulsar 消費後會被立刻置為已讀。連接器無法保證此種場景下的消息一致性。連接器在 Pulsar 上使用日誌的形式記錄某個事務下的消息確認,為了更好的性能,請縮短 Flink 做檢查點的間隔。
升級與問題診斷
升級步驟參閱升級應用程序和 Flink 版本[15]。Pulsar 連接器沒有在 Flink 端存儲消費的狀態,所有的消費信息都推送到了 Pulsar。
注意:
•不要同時升級 Pulsar 連接器和 Pulsar 服務端的版本。•使用最新版本的 Pulsar 客戶端來消費消息。
Flink 只使用了 Pulsar 的Java 客戶端[16]和管理 API[17]。使用 Flink 和 Pulsar 交互時如果遇到問題,很有可能與 Flink 無關,請先升級 Pulsar 的版本、Pulsar 客戶端的版本,或者修改 Pulsar 的配置、Pulsar 連接器的配置來嘗試解決問題。
聯繫我們
歡迎大家使用 Pulsar Flink Connector 並與我們交流,共同優化這個批流一體的項目。目前社區已成立 Pulsar Flink Connector SIG(特殊興趣小組),掃描下方 Bot 二維碼,回復「Flink」加入 Pulsar Flink Connector SIG,與項目開發者交流。
引用鏈接
[1]事務機制:https://pulsar.apache.org/docs/en/txn-what/[2]PIP-72:https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification[3]文檔:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/project-configuration/[4]pulsar-flink:https://github.com/streamnative/pulsar-flink[5]指定消費的 Topic 或者 Topic 分區:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/pulsar/#%e6%8c%87%e5%ae%9a%e6%b6%88%e8%b4%b9%e7%9a%84-topic-%e6%88%96%e8%80%85-topic-%e5%88%86%e5%8c%ba[6]反序列化器:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/pulsar/#%e5%8f%8d%e5%ba%8f%e5%88%97%e5%8c%96%e5%99%a8[7]Schema:https://pulsar.apache.org/docs/en/schema-understand/[8]額外的屬性:https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF[9]exclusive(獨占):https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#exclusive[10]shared(共享):https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#shared[11]failover(災備):https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#failover[12]key_shared(key 共享):https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#key_shared[13]其他配置項:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/pulsar/#%e5%85%b6%e4%bb%96%e9%85%8d%e7%bd%ae%e9%a1%b9[14]文檔:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/[15]升級應用程序和 Flink 版本:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/ops/upgrading/[16]Java 客戶端:https://pulsar.apache.org/docs/zh-CN/client-libraries-java/[17]管理 API:https://pulsar.apache.org/docs/zh-CN/admin-api-overview/
▼關注「Apache Pulsar」,獲取乾貨與動態▼