點擊上方
藍字關注我們

作者 | 小鑽風
編輯| 邱忠標
Kerberos 是一種計算機網絡授權協議,用來在非安全網絡中,對個人通信以安全的手段進行身份認證。
本文將手把手指導從零開始在 DolphinScheduler 中測試Kerberos 認證的HIVE 連接,其中也包括實際環境中所遇到問題的一些解決方法。希望該篇文章能夠有幸幫助到您成功掃雷~
巡山不辭辛苦,大王一高興,直接賜下水酒,心情自然是酣暢淋漓。想我是個名副其實的古靈精怪,卻也偶爾自詡算是小半個喜好登高望遠的文人騷客,此刻借着酒意,也想吟詩一首哇:拍案聲起,咱們書接上回《一文給你整明白租戶在 Apache DolphinScheduler 中的作用》。看完這篇文章後,相信大家已經對租戶有了明確認識,讓我們再回顧一下來,read after me:ziwu租,fuwu戶……集群開啟了 Kerberos 認證,剛接觸 DolphinScheduler 而還不知 Kerberos 的小夥伴們,不禁虎軀一震,只覺是腦門上放鞭炮(大難臨頭),嘶吼一句:配個 SQL 任務,就想連個 HIVE,show 一下 tables,就報任務執行失敗,這也太難咯……帶着義憤,還請咱們繼續往下看。度娘這次是不請自來,直呼:Kerberos 是一種計算機網絡授權協議,用來在非安全網絡中,對個人通信以安全的手段進行身份認證。進一步解釋是:通過密鑰系統為客戶機 / 服務器應用程序提供強大的認證服務。該認證過程的實現不依賴於主機操作系統的認證,無需基於主機地址的信任,不要求網絡上所有主機的物理安全,並假定網絡上傳送的數據包可以被任意地讀取、修改和插入數據。在以上情況下, Kerberos 作為一種可信任的第三方認證服務,是通過傳統的密碼技術(如:共享密鑰)執行認證服務的。簡而言之,Kerberos 是一種被廣泛運用在大數據生態中的身份認證協議,甚至可以說是大數據身份認證的事實標準。一句話來說,Kerberos 是一種基於加密 Ticket 的身份認證協議。Kerberos 主要由 Key Distribution Center (即KDC)、Client 和 Service 三部分組成,客戶端會先訪問兩次 KDC,然後再訪問目標 Service,如 HTTP服務。咱們就先盯盯 DolphinScheduler 中,與 Kerberos 相關的配置即可:resource.storage.type 開啟kerberos 需要指定文件存儲格式hadoop.security.authentication.startup.state 是否開啟kerberos 認證java.security.krb5.conf.path kerberos5配置文件所在路徑login.user.keytab.username kerberos認證的用戶principal名稱login.user.keytab.path 用戶的keytab認證文件所在路徑kerberos.expire.time kerberos 認證的過期時間好,天氣冷,屁股涼,話不多說,讓咱們帶着這些個配置參數進入 DolphinScheduler 源碼,摸清如何通過 Kerberos 認證連接。(Windows 本地測試環境,僅供參考,使用 IDEA 打開)位於 dolphinscheduler-common/src/main/resources 目錄下#......# resource store on HDFS/S3/NONEresource.storage.type=HDFS# whether to startup kerberoshadoop.security.authentication.startup.state=true# java.security.krb5.conf pathjava.security.krb5.conf.path=D:/security/krb5/krb5.conf# login user from keytab usernamelogin.user.keytab.username=hive@BIGDATA# login user from keytab pathlogin.user.keytab.path=D:/security/keytabs/hive.service.keytab#......位於 dolphinscheduler-dao/src/main/resources 目錄下
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.242.40:3306/dolphinscheduler_2.0.1_test?useUnicode=true&characterEncoding=UTF-8&useSSL=false username: bigdata password: bigdata@xiaozuanfeng123 hikari: connection-test-query: select 1 minimum-idle: 5 auto-commit: true validation-timeout: 3000 pool-name: DolphinScheduler maximum-pool-size: 50 connection-timeout: 30000 idle-timeout: 600000 leak-detection-threshold: 0 initialization-fail-timeout: 11、下載 zookeeper
D:\software\java>crulhttps://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz--outputapache-zookeeper-3.6.3-bin.tar.gzD:\software\java>tar-xvfapache-zookeeper-3.6.3-bin.tar.gzD:\software\java\apache-zookeeper-3.6.3-bin>cdconfD:\software\java\apache-zookeeper-3.6.3-bin\conf>copyzoo_sample.cfgzoo.cfgD:\software\java\apache-zookeeper-3.6.3-bin\bin>./zkServer.cmd在 dolphinscheduler-service/src/main/resources 目錄下registry.plugin.name=zookeeperregistry.servers=127.0.0.1:2181-Dspring.profiles.active=mysqldolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java -Dspring.profiles.active=mysqldolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java -Dspring.profiles.active=mysqldolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java進入dolphinscheduler-ui 目錄下執行 npm install --registry=https://registry.npm.taobao.org在dolphinscheduler-datasource-plugin/dolphinscheduler-datasourceapi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource目錄下的 AbstractDataSourceProcessor 類中private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$");private static final Pattern IPV6_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\:\\[\\]\\,]+$");protected void checkHost(String host) { if (!IPV4_PATTERN.matcher(host).matches() || !IPV6_PATTERN.matcher(host).matches()) { throw new IllegalArgumentException("datasource host illegal"); }}private static final Pattern DATABASE_PATTER = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$");protected void checkDatasourcePatter(String database) { if (!DATABASE_PATTER.matcher(database).matches()) { throw new IllegalArgumentException("datasource name illegal"); }}private static final Pattern PARAMS_PATTER = Pattern.compile("^[a-zA-Z0-9\\-\\_\\/\\@\\.]+$");protected void checkOther(Map<String, String> other) { if (MapUtils.isEmpty(other)) { return; } boolean paramsCheck = other.entrySet().stream().allMatch(p -> PARAMS_PATTER.matcher(p.getValue()).matches()); if (!paramsCheck) { throw new IllegalArgumentException("datasource other params illegal"); }}對於小夥伴們遇到的具體連接參數中可能出現的特殊字符,需要結合實際自行修改呢!1、根據不同的數據庫類型,構建不同的 JDBC 連接參數。2、其中 HiveDatasourceProcessor 實現類就是咱們所關注的。@Overridepublic BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { HiveDataSourceParamDTO hiveParam = (HiveDataSourceParamDTO) datasourceParam; // 用來組裝 如 jdbc:hive2://ip:port/db 的 jdbc url StringBuilder address = new StringBuilder(); address.append(Constants.JDBC_HIVE_2); for (String zkHost : hiveParam.getHost().split(",")) { address.append(String.format("%s:%s,", zkHost, hiveParam.getPort())); } address.deleteCharAt(address.length() - 1); String jdbcUrl = address.toString() + "/" + hiveParam.getDatabase(); // 設置 hive 連接參數,見名知意 HiveConnectionParam hiveConnectionParam = new HiveConnectionParam(); hiveConnectionParam.setDatabase(hiveParam.getDatabase()); hiveConnectionParam.setAddress(address.toString()); hiveConnectionParam.setJdbcUrl(jdbcUrl); hiveConnectionParam.setUser(hiveParam.getUserName()); hiveConnectionParam.setPassword(PasswordUtils.encodePassword(hiveParam.getPassword())); hiveConnectionParam.setDriverClassName(getDatasourceDriver()); hiveConnectionParam.setValidationQuery(getValidationQuery()); // 當文件存儲系統為 HDFS 並且 開啟 kerberos 認證時 // 設置 kerberos 相關參數 if (CommonUtils.getKerberosStartupState()) { hiveConnectionParam.setPrincipal(hiveParam.getPrincipal()); hiveConnectionParam.setJavaSecurityKrb5Conf(hiveParam.getJavaSecurityKrb5Conf()); hiveConnectionParam.setLoginUserKeytabPath(hiveParam.getLoginUserKeytabPath()); hiveConnectionParam.setLoginUserKeytabUsername(hiveParam.getLoginUserKeytabUsername()); } // 稍微重構了一下 transformOther 方法,提取到父類中 hiveConnectionParam.setOther(transformOther(getDbType(),hiveParam.getOther())); hiveConnectionParam.setProps(hiveParam.getOther()); // 返回 return hiveConnectionParam;}@Overridepublic Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) { Result<Object> result = new Result<>(); // 測試連接 // 獲取連接 try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) { if (connection == null) { putMsg(result, Status.CONNECTION_TEST_FAILURE); return result; } putMsg(result, Status.SUCCESS); return result; } catch (Exception e) { logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, e.getMessage()); return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), e.getMessage()); }}dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin 目錄下的 DataSourceClientProvider 類中public Connection getConnection(DbType dbType, ConnectionParam connectionParam) { BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam; String datasourceUniqueId = DataSourceUtil.getDatasourceUniqueId(baseConnectionParam, dbType); logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId); // 有就直接獲取 // 沒有就創建 DataSourceClient dataSourceClient = uniqueId2dataSourceClientMap.computeIfAbsent(datasourceUniqueId, $ -> { Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap(); DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp()); if (null == dataSourceChannel) { throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp())); } return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType); }); return dataSourceClient.getConnection();}4、其中 HiveDataSourceClient 實現類就是我們所關注的。其父類 CommonDataSourceClient 構造分為 4 個過程,也是最為關鍵的四個過程,具體看 HiveDataSourceClient 中的實現。public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { this.baseConnectionParam = baseConnectionParam; // 預初始化 preInit(); // 校驗必要參數 checkEnv(baseConnectionParam); // 初始化連接 initClient(baseConnectionParam, dbType); // 檢查連接連通性 checkClient();}@Overrideprotected void preInit() { logger.info("PreInit in {}", getClass().getName()); // kerberos 連接池 this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor();}@Overrideprotected void checkEnv(BaseConnectionParam baseConnectionParam) { // 父類檢驗必要參數,在此忽略 super.checkEnv(baseConnectionParam); // 校驗 kerberos 必要參數 checkKerberosEnv();}private void checkKerberosEnv() { // kerberos5配置文件所在路徑 String krb5File = PropertyUtils.getString(JAVA_SECURITY_KRB5_CONF_PATH); if (StringUtils.isNotBlank(krb5File)) { // 設置系統環境 System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); try { Config.refresh(); Class<?> kerberosName = Class.forName("org.apache.hadoop.security.authentication.util.KerberosName"); Field field = kerberosName.getDeclaredField("defaultRealm"); field.setAccessible(true); field.set(null, Config.getInstance().getDefaultRealm()); } catch (Exception e) { throw new RuntimeException("Update Kerberos environment failed.", e); } }}@Overrideprotected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { logger.info("Create Configuration for hive configuration."); // 可人為忽略 this.hadoopConf = createHadoopConf(); logger.info("Create Configuration success."); logger.info("Create UserGroupInformation."); // 創建 ugi this.ugi = createUserGroupInformation(baseConnectionParam.getUser()); logger.info("Create ugi success."); // 父類初始化必要的測試連接 super.initClient(baseConnectionParam, dbType); // 獲取 hive session 連接 this.oneSessionDataSource = JdbcDataSourceProvider.createOneSessionJdbcDataSource(dbType,baseConnectionParam); logger.info("Init {} success.", getClass().getName());}創建 UGIprivate UserGroupInformation createUserGroupInformation(String username) { // kerberos5 配置文件所在路徑 String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH); // 用戶的 keytab 認證文件所在路徑 String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH); // kerberos 認證的用戶 principal 名稱 String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME); try { // 創建 UGI UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), principal, keytab, krb5File, username); try { Field isKeytabField = ugi.getClass().getDeclaredField("isKeytab"); isKeytabField.setAccessible(true); isKeytabField.set(ugi, true); } catch (NoSuchFieldException | IllegalAccessException e) { logger.warn(e.getMessage()); } kerberosRenewalService.scheduleWithFixedDelay(() -> { try { ugi.checkTGTAndReloginFromKeytab(); } catch (IOException e) { logger.error("Check TGT and Renewal from Keytab error", e); } }, 5, 5, TimeUnit.MINUTES); return ugi; } catch (IOException e) { throw new RuntimeException("createUserGroupInformation fail. ", e); }}咱們繼續跟進。public static synchronized UserGroupInformation createUGI(Configuration configuration, String principal, String keyTab, String krb5File, String username) throws IOException { // 文件系統 是否為 HDFS 以及 是否開啟kerberos 認證 if (getKerberosStartupState()) { Objects.requireNonNull(keyTab); if (StringUtils.isNotBlank(krb5File)) { // 設置系統環境 System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); } // 進行登錄 return loginKerberos(configuration, principal, keyTab); } return UserGroupInformation.createRemoteUser(username);}public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab) throws IOException { // 配置 hadoop 認證模式為 kerberos config.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS); UserGroupInformation.setConfiguration(config); // 使用 principal 和對應的 keytab 登錄 UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); return UserGroupInformation.getCurrentUser();}創建 UGI 後獲取 hive session 連接,在 JdbcDataSourceProvider 類中。(劃重點,劃重點,所有需要通過 JDBC 獲取數據庫連接的功能,都由該類向外提供)public static HikariDataSource createOneSessionJdbcDataSource(DbType dbType,BaseConnectionParam properties) { logger.info("Creating OneSession HikariDataSource pool for maxActive:{}", PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); HikariDataSource dataSource = new HikariDataSource(); dataSource.setDriverClassName(properties.getDriverClassName()); // 由於我的環境中需要 other 中的一些特徵參數,所以需要將 other 參數拼接進 jdbc url 中 // 而這麼做,基本上可以解決許多特殊的或者定製化的連接 // 其中該類中 createJdbcDataSource 方法同樣可以這麼處理 dataSource.setJdbcUrl(DataSourceUtil.getJdbcUrl(dbType,properties)); dataSource.setUsername(properties.getUser()); dataSource.setPassword(PasswordUtils.decodePassword(properties.getPassword())); // 前些天與小夥伴聊到相關,剛好看到相關 pr,便集成進來 Boolean isOneSession = PropertyUtils.getBoolean(Constants.SUPPORT_HIVE_ONE_SESSION, false); dataSource.setMinimumIdle(isOneSession ? 1 : PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE, 5)); dataSource.setMaximumPoolSize(isOneSession ? 1 : PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); dataSource.setConnectionTestQuery(properties.getValidationQuery()); if (properties.getProps() != null) { properties.getProps().forEach(dataSource::addDataSourceProperty); } logger.info("Creating OneSession HikariDataSource pool success."); return dataSource;}
在其父類 CommonDataSourceClient 中使用創建的連接。//Checking data source clientStopwatch stopwatch = Stopwatch.createStarted();try { // 執行 select 1 測試是否成功,若本地測試,需要在 dolphinscheduler-datasource-api 下的 pom.xml 文件中添加 hive-jdbc 依賴,直接拷貝現成即可 this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());} catch (Exception e) { throw new RuntimeException("JDBC connect failed", e);} finally { logger.info("Time to execute check jdbc client with sql {} for {} ms ", this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));}到這兒,咱們終於一覽了開啟 Kerberos 的 Hive 連接過程,其中也包括實際環境中所遇到問題的一些解決方法。本次和小夥伴們分享了在 DolphinScheduler 中測試的 HIVE 連接,希望本文能夠有幸幫助到您成功掃雷。好,話再不多說……嗝~好酒!隨着國內開源的迅猛崛起,Apache DolphinScheduler 社區迎來蓬勃發展,為了做更好用、易用的調度,真誠歡迎熱愛開源的夥伴加入到開源社區中來,為中國開源崛起獻上一份自己的力量,讓本土開源走向全球。參與 DolphinScheduler 社區有非常多的參與貢獻的方式,包括:
貢獻第一個PR(文檔、代碼)我們也希望是簡單的,第一個PR用於熟悉提交的流程和社區協作以及感受社區的友好度。
社區匯總了以下適合新手的問題列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手問題列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何參與貢獻鏈接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html
來吧,DolphinScheduler開源社區需要您的參與,為中國開源崛起添磚加瓦吧,哪怕只是小小的一塊瓦,匯聚起來的力量也是巨大的。
參與開源可以近距離與各路高手切磋,迅速提升自己的技能,如果您想參與貢獻,我們有個貢獻者種子孵化群,可以添加社區小助手
微信(Leonard-ds)手把手教會您( 貢獻者不分水平高低,有問必答,關鍵是有一顆願意貢獻的心 )。添加小助手微信時請說明想參與貢獻。
來吧,開源社區非常期待您的參與。
社區官網https://dolphinscheduler.apache.org/代碼倉地址https://github.com/apache/dolphinscheduler您的Star,是Apache DolphinScheduler為愛發電的動力❤️~


☞WorkflowAsCode 來了,Apache DolphinScheduler 2.0.2 驚喜發布!☞恭喜 Apache DolphinScheduler 入選可信開源社區共同體預備成員!☞Apache DolphinScheduler 獲評 2021 OSC 最受歡迎項目,白鯨開源獲優秀中國開源原生創企獎項!☞看看又是誰在悄悄做貢獻?☞感謝有你!所有貢獻者來領禮物了☞一文給你整明白多租戶在 Apache DolphinScheduler 中的作用☞開源並不是大牛的專屬,普通人也能有屬於自己的一畝三分地☞在 Apache DolphinScheduler 上調試 LDAP 登錄,親測有效!☞4 億用戶,7W+ 作業調度難題,Bigo 基於 Apache DolphinScheduler 巧化解