close

(給ImportNew加星標,提高Java技能)

公司 DBA 一直埋怨 Atlas 的難用,希望從客戶端層出一個讀寫分離的方案。開源市場上在客戶端做讀寫分離的開源軟件基本上沒有。業務方利用 Spring 自帶的路由數據源能實現部分讀寫分離的功能,但是功能不夠完善。部分參考 Sharing-JDBC源碼思想,利用部分業餘時間,寫了這個 Robustdb,總共只使用了十多個類,兩千多行代碼左右。

一、背景

隨着業務量的增長,所有公司都不是直接選擇分庫分表設計方案的。很長一段時間內,會採用 庫垂直拆分和分區表 來解決庫表數據量比較大的問題,採用讀寫分離來解決訪問壓力比較大的問題。我們公司也是一樣。目前絕大部分業務還是使用讀寫分離的方案。我相信很多公司和我們公司的架構一樣,採用中間代理層做讀寫分離。結構如下:

第一層是 VIP 曾。通過 VIP 做中間映射層,避免了應用綁定數據庫的真實 IP,這樣在數據庫故障時,可以通過 VIP 飄移來將流量打到另一個庫。但是 VIP 無法跨機房,為未來的異地多活設計埋下繞不過去的坎。

VIP 下面一層是讀寫分離代理,我們公司使用的是 360 的 Atlas。Atlas 通過將 SQL 解析為 DML(Data Modify Language)和 DQL(Data Query Language),DML 的請求全部發到主庫,DQL 根據配置比例分發到讀庫(讀庫包括主庫和從庫)。

使用 Atlas 有以下不足:

Altas 不再維護更新,現存一些 bug,bug 網上很多描述;

Altas中沒有具體應用請求 IP 與具體數據庫 IP 之間的映射數據,所以無法準確查到訪問DB的請求是來自哪個應用;

Altas 控制的粒度是 SQL語句,只能指定某條查詢 SQL語句走主庫,不能根據場景指定;

DB 在自動關閉某個與 Altas之間的連接時,Altas不會刷新,它仍有可能把這個失效的連接給下次請求的應用使用;

使用 Altas,對後期增加其他功能模會比較麻煩。

基於 Atlas 以上問題,以及我們需要將數據庫賬號和連接配置集中管控。我們設計了下面這套方案:

通過在客戶端做讀寫分離可以解決 Atlas 上面存在的不足。整個流程如下圖所示:

二、Robustdb 原理

1、讀寫分離設計核心點——路由

支持每條 SQL 按照 DML、DQL 類型的默認路由。

需求描述

目前公司採用讀寫分離的方案來增強數據庫的性能,所有的 DML(insert、updata、delete)操作在主庫,通過 MySQL 的 binlog 同步,將數據同步到多個讀庫。所有的 DQL(select) 操作主庫或從庫,從而增強數據的讀能力。

支持方法級別的指定路由

需求描述

在 Service 中指定方法中所有 DB 操作方法操作同一個數據庫(主要是主庫),保證方法中的 DB 讀寫都操作主庫,避免數據同步延遲導致讀從庫數據異常。從而保證整個方法的事務屬性。

解決思路

我們將獲取真實數據庫(主庫還是哪個從庫)放到需要建立連接時的地方,為此我們創建了 BackendConnection(傳統是先連接數據庫,然後再創建連接)。

在獲取數據庫連接時,通過對請求的 SQL 進行解析和類型判別,識別為 DML 和 DQL。如果是DML,則在線程的單 SQL 線程本地變量上設置為 master,DQL 則設置為 slave,為後續選擇數據庫提供選擇參考。

如果要支持方法級別的事務(也就是整個方法的 SQL 請求都發送到主庫),需要藉助攔截器,我們採用的是 AspectJ 方式的攔截器。會攔截所有帶有類型為 dataSourceType 的 annotation 的方法。在執行方法前,在線程的多 SQL 線程本地變量上設置 dataSourceType 的 name 值(name 值為master 代表走主庫,name 值為 slave 代表走從庫)。線程的多 SQL 線程本地變量為後續選擇數據庫提供選擇參考。在方法執行完後,清理本地線程變量。

多 SQL 線程本地變量的優先級高於單 SQL 線程本地變量的優先級。


注意點

本地線程變量要使用阿里包裝的 Ttl,防止用戶在方法內部啟動線程池,導致普通的線程本地變量丟失,從而導致選庫異常。

使用 Ttl 之後,需要在公司的 JVM 啟動參數中增加

-javaagent:/{Path}/transmittable-thread-local-2.6.0-SNAPSHOT.jar

原理就是在 JVM 啟動時,加載 transmittable-thread-local 中的類替換邏輯,將以後的 Runnable、Callable、ExecuteService 等線程池相關類替換成增強後的 TtlRunnable、TtlCallable、TtlExecuteService 等。

下面展示一下時序圖中類的核心代碼,僅供參考:

DataSoueceAspect

@Aspect@Componentpublic class DataSourceAspect{ @Around("execution(* *(..)) && @annotation(dataSourceType)") public Object aroundMethod(ProceedingJoinPoint pjd, DataSourceType dataSourceType) throws Throwable { DataSourceContextHolder.setMultiSqlDataSourceType(dataSourceType.name()); Object result = pjd.proceed(); DataSourceContextHolder.clearMultiSqlDataSourceType(); return result; }}

BackendConnection

public final class BackendConnection extends AbstractConnectionAdapter { private AbstractRoutingDataSource abstractRoutingDataSource; // 用於緩存一條sql(可能對應多個statement)或者一次事務中的連接 private final Map<String, Connection> connectionMap = new HashMap<String, Connection>(); //構造函數 public BackendConnection(AbstractRoutingDataSource abstractRoutingDataSource) { this.abstractRoutingDataSource = abstractRoutingDataSource; } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql); } @Override public DatabaseMetaData getMetaData() throws SQLException { if(connectionMap == null || connectionMap.isEmpty()){ return abstractRoutingDataSource.getResolvedDefaultDataSource().getConnection().getMetaData(); } return fetchCachedConnection(connectionMap.keySet().iterator().next().toString()).get().getMetaData(); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql,resultSetType,resultSetConcurrency); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, autoGeneratedKeys); } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, columnIndexes); } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, columnNames); } @Override protected Collection<Connection> getConnections() { return connectionMap.values(); } /** * 根據sql獲取連接,對連接進行緩存 * @param sql * @return * @throws SQLException */ private Connection getConnectionInternal(final String sql) throws SQLException { //設置線程環境遍歷 if (ExecutionEventUtil.isDML(sql)) { DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.MASTER); } else if (ExecutionEventUtil.isDQL(sql)) { DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.SLAVE); } //根據上面設置的環境變量,選擇相應的數據源 Object dataSourceKey = abstractRoutingDataSource.determineCurrentLookupKey(); String dataSourceName = dataSourceKey.toString(); //看緩存中是否已經含有相應數據源的連接 Optional<Connection> connectionOptional = fetchCachedConnection(dataSourceName); if (connectionOptional.isPresent()) { return connectionOptional.get(); } //緩存中沒有相應連接,建立相應連接,並放入緩存 Connection connection = abstractRoutingDataSource.getTargetDataSource(dataSourceKey).getConnection(); connection.setAutoCommit(super.getAutoCommit()); connection.setTransactionIsolation(super.getTransactionIsolation()); connectionMap.put(dataSourceKey.toString(), connection); return connection; } /** * 從緩存中取數據源 * @param dataSourceName * @return */ private Optional<Connection> fetchCachedConnection(final String dataSourceName) { if (connectionMap.containsKey(dataSourceName)) { return Optional.of(connectionMap.get(dataSourceName)); } return Optional.absent(); }}

AbstractRoutingDataSource

/** * * @Type AbstractRoutingDataSource * @Desc 數據源路由器(spring的AbstractRoutingDataSource將resolvedDataSources的注入放在bean初始化) * @Version V1.0 */public abstract class AbstractRoutingDataSource extends AbstractDataSource {privatebooleanlenientFallback=true;privateMap<Object,Object>targetDataSources;privateObjectdefaultTargetDataSource;privateMap<Object,DataSource>resolvedDataSources=newHashMap<Object,DataSource>();privateDataSourceresolvedDefaultDataSource;privateLoggerlogger=LoggerFactory.getLogger(AbstractRoutingDataSource.class); public BackendConnection getConnection() throws SQLException { return new BackendConnection(this); } public BackendConnection getConnection(String username, String password) throws SQLException {returnnewBackendConnection(this); } public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size()); for (Map.Entry entry : this.targetDataSources.entrySet()) { Object lookupKey = resolveSpecifiedLookupKey(entry.getKey()); DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.resolvedDataSources.put(lookupKey, dataSource); } if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } public void putNewDataSource(Object key, DataSource dataSource){ if(this.resolvedDataSources == null){ this.resolvedDataSources = new HashMap<Object, DataSource>(); } if(this.resolvedDataSources.containsKey(key)){ this.resolvedDataSources.remove(key); logger.info("remove old key:" + key); } logger.info("add key:" + key + ", value=" + dataSource); this.resolvedDataSources.put(key, dataSource); } /** * 數據源選擇邏輯 */ public DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; } public DataSource getTargetDataSource(Object lookupKey) { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); if(lookupKey == null){ lookupKey = determineCurrentLookupKey(); } DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; }publicabstractObjectdetermineCurrentLookupKey(); public abstract Object getCurrentSlaveKey(); @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface)); } @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> iface) throws SQLException { if (iface.isInstance(this)){ return (T) this; } return determineTargetDataSource().unwrap(iface); } protected Object resolveSpecifiedLookupKey(Object lookupKey) { return lookupKey; } protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException { if (dataSource instanceof DataSource) { return (DataSource) dataSource; } else { throw new IllegalArgumentException( "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource); } } //get set方法省略}

AbstractRoutingDataSource

/** * * @Type AbstractRoutingDataSource * @Desc 數據源路由器(spring的AbstractRoutingDataSource將resolvedDataSources的注入放在bean初始化) * @Version V1.0 */public abstract class AbstractRoutingDataSource extends AbstractDataSource {privatebooleanlenientFallback=true;privateMap<Object,Object>targetDataSources;privateObjectdefaultTargetDataSource;privateMap<Object,DataSource>resolvedDataSources=newHashMap<Object,DataSource>();privateDataSourceresolvedDefaultDataSource; private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class); public BackendConnection getConnection() throws SQLException { return new BackendConnection(this); } public BackendConnection getConnection(String username, String password) throws SQLException {returnnewBackendConnection(this); } public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size()); for (Map.Entry entry : this.targetDataSources.entrySet()) { Object lookupKey = resolveSpecifiedLookupKey(entry.getKey()); DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.resolvedDataSources.put(lookupKey, dataSource); } if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } public void putNewDataSource(Object key, DataSource dataSource){ if(this.resolvedDataSources == null){ this.resolvedDataSources = new HashMap<Object, DataSource>(); } if(this.resolvedDataSources.containsKey(key)){ this.resolvedDataSources.remove(key); logger.info("remove old key:" + key); } logger.info("add key:" + key + ", value=" + dataSource); this.resolvedDataSources.put(key, dataSource); } /** * 數據源選擇邏輯 */ public DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; } public DataSource getTargetDataSource(Object lookupKey) { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); if(lookupKey == null){ lookupKey = determineCurrentLookupKey(); } DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; }publicabstractObjectdetermineCurrentLookupKey(); public abstract Object getCurrentSlaveKey(); @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface)); } @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> iface) throws SQLException { if (iface.isInstance(this)){ return (T) this; } return determineTargetDataSource().unwrap(iface); } protected Object resolveSpecifiedLookupKey(Object lookupKey) { return lookupKey; } protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException { if (dataSource instanceof DataSource) { return (DataSource) dataSource; } else { throw new IllegalArgumentException( "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource); } } //get set方法省略}

DataSourceContextHolder

public class DataSourceContextHolder {privatestaticfinalTransmittableThreadLocal<String>singleSqlContextHolder=newTransmittableThreadLocal<String>(); private static final TransmittableThreadLocal<String> multiSqlContextHolder = new TransmittableThreadLocal<String>(); /** * @Description: 設置單條sql數據源類型 * @param dataSourceType 數據庫類型 * @return void * @throws */ public static void setSingleSqlDataSourceType(String dataSourceType) { singleSqlContextHolder.set(dataSourceType); } /** * @Description: 獲取單條sql數據源類型 * @param * @return String * @throws */ public static String getSingleSqlDataSourceType() { return singleSqlContextHolder.get(); } /** * @Description: 清除單條sql數據源類型 * @param * @return void * @throws */ public static void clearSingleSqlDataSourceType() { singleSqlContextHolder.remove(); } /** * @Description: 設置多條sql數據源類型 * @param dataSourceType 數據庫類型 * @return void * @throws */ public static void setMultiSqlDataSourceType(String dataSourceType) { multiSqlContextHolder.set(dataSourceType); } /** * @Description: 獲取多條sql數據源類型 * @param * @return String * @throws */ public static String getMultiSqlDataSourceType() { return multiSqlContextHolder.get(); } /** * @Description: 清除多條sql數據源類型 * @param * @return void * @throws */ public static void clearMultiSqlDataSourceType() { multiSqlContextHolder.remove(); } /** * 判斷當前線程是否為使用從庫為數據源. 最外層service有slave的aop標籤 或者 service沒有aop標籤且單條sql為DQL * * @return */ public static boolean isSlave() { return "slave".equals(multiSqlContextHolder.get()) || (multiSqlContextHolder.get()==null && "slave".equals(singleSqlContextHolder.get())) ;}}
DynamicDataSource
public class DynamicDataSource extends AbstractRoutingDataSource implements InitializingBean{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(DynamicDataSource.class); private Integer slaveCount = 0; // 輪詢計數,初始為-1,AtomicInteger是線程安全的 private AtomicInteger counter = new AtomicInteger(-1); // 記錄讀庫的key private List<Object> slaveDataSources = new ArrayList<Object>(0); // slave庫的權重 private Map<Object,Integer> slaveDataSourcesWeight;privateObjectcurrentSlaveKey; public DynamicDataSource() { super(); } /** * 構造函數 * @param defaultTargetDataSource * @param targetDataSources * @param slaveDataSourcesWeight */ public DynamicDataSource(Object defaultTargetDataSource, Map<Object,Object> targetDataSources, Map<Object,Integer> slaveDataSourcesWeight) { this.setResolvedDataSources(new HashMap<Object, DataSource>(targetDataSources.size())); for (Map.Entry<Object, Object> entry : targetDataSources.entrySet()) { DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.putNewDataSource(entry.getKey(), dataSource); } if (defaultTargetDataSource != null) { this.setResolvedDefaultDataSource(resolveSpecifiedDataSource(defaultTargetDataSource)); } this.setSlaveDataSourcesWeight(slaveDataSourcesWeight); this.afterPropertiesSet(); } @Override public Object determineCurrentLookupKey() { // 使用DataSourceContextHolder保證線程安全,並且得到當前線程中的數據源key if (DataSourceContextHolder.isSlave()) { currentSlaveKey = getSlaveKey(); return currentSlaveKey; } //TODO Object key = "master"; return key; } @Override public void afterPropertiesSet() { try { super.afterPropertiesSet(); Map<Object, DataSource> resolvedDataSources = this.getResolvedDataSources(); //清空從庫節點,重新生成 slaveDataSources.clear(); slaveCount = 0; for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) { if(slaveDataSourcesWeight.get(entry.getKey())==null){ continue; } for(int i=0; i<slaveDataSourcesWeight.get(entry.getKey());i++){ slaveDataSources.add(entry.getKey()); slaveCount++; } } } catch (Exception e) { logger.error("afterPropertiesSet error! ", e); } } /** * 輪詢算法實現 * * @return */ public Object getSlaveKey() { if(slaveCount <= 0 || slaveDataSources == null || slaveDataSources.size() <= 0){ return null; } Integer index = counter.incrementAndGet() % slaveCount; if (counter.get() > 9999) { // 以免超出Integer範圍 counter.set(-1); // 還原 } return slaveDataSources.get(index); } public Map<Object, Integer> getSlaveDataSourcesWeight() { return slaveDataSourcesWeight; } public void setSlaveDataSourcesWeight(Map<Object, Integer> slaveDataSourcesWeight) { this.slaveDataSourcesWeight = slaveDataSourcesWeight; } public Object getCurrentSlaveKey() { return currentSlaveKey; }}

2、讀庫流量分配策略設計

我們所有的數據庫連接都是管控起來的,包括每個庫的流量配置都是支持動態分配的。

支持讀庫按不同比例承接讀請求。通過配置頁面動態調整應用的數據庫連接以及比例,支持隨機或者順序的方式將流量分配到相應的讀庫中去。

這裡我們使用的配置管理下發中心是我們公司自己開發的 gconfig,當然替換成開源的 diamond 或者 applo 也是可以的。

當接收到配管中心的調整指令,會動態更新應用數據源連接,然後更新 beanFactory 中的 datasource。核心函數如下:

/** * 更新beanFactory * @param properties */public void refreshDataSource(String properties) { YamlDynamicDataSource dataSource; try { dataSource = new YamlDynamicDataSource(properties); } catch (IOException e) { throw new RuntimeException("convert datasource config failed!"); } // 驗證必須字段是否存在 if (dataSource == null && dataSource.getResolvedDataSources() == null || dataSource.getResolvedDefaultDataSource() == null || dataSource.getSlaveDataSourcesWeight() == null) { throw new RuntimeException("datasource config error!"); } ConcurrentHashMap<Object, DataSource> newDataSource = new ConcurrentHashMap<Object, DataSource>( dataSource.getResolvedDataSources()); //更新數據源的bean DynamicDataSource dynamicDataSource = (DynamicDataSource) ((DefaultListableBeanFactory) beanFactory) .getBean(dataSourceName); dynamicDataSource.setResolvedDefaultDataSource(dataSource.getResolvedDefaultDataSource()); dynamicDataSource.setResolvedDataSources(new HashMap<Object, DataSource>());//將數據源清空,重新添加 for (Entry<Object, DataSource> element : newDataSource.entrySet()) { dynamicDataSource.putNewDataSource(element.getKey(), element.getValue()); } dynamicDataSource.setSlaveDataSourcesWeight(dataSource.getSlaveDataSourcesWeight()); dynamicDataSource.afterPropertiesSet();}
三、性能

我們經過性能測試,發現 Robustdb 的性能在一定層度上比 Atlas 性能更好。壓測結果如下:

參考

https://tech.meituan.com/mtddl.html

https://tech.meituan.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E9%AB%98%E5%8F%AF%E7%94%A8%E6%9E%B6%E6%9E%84%E7%9A%84%E6%BC%94%E8%BF%9B%E4%B8%8E%E8%AE%BE%E6%83%B3.html

轉自:彥幀,

鏈接:jianshu.com/p/549d88222528

- EOF -

推薦閱讀點擊標題可跳轉

1、一款優秀數據庫中間件的不完全解析

2、MySQL讀寫分離:如何解決寫完讀不到問題

3、步步深入:MySQL 架構總覽->查詢執行流程->SQL 解析順序

看完本文有收穫?請轉發分享給更多人

關注「ImportNew」,提升Java技能

點讚和在看就是最大的支持❤️

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()