close

最近看了下關於分布式限流的部分,看到 Sentinel 的分布式限流,也就是集群限流的部分。想搭個環境看看,結果發現網上關於這方面的內容基本可以說沒有,你甚至很難跑起來他的 demo。就算能跑起來,估計也得自己研究半天,麻煩得要死。

我猜測很重要的原因可能就是 Sentinel 關於這塊做的並不完善,而且從官方的 Issue 中能看出來,其實官方對於這塊後續並沒有計劃去做的更好。

那麼廢話不多說。在此之前,肯定要先說下關於 Sentinel 集群限流方面的原理,沒有原理一切都是空中樓閣。

1. 集群限流原理

原理這方面比較好解釋,就是在原本的限流規則中加了一個 clusterMode 參數。如果設置為true,那麼會走集群限流模式,反之就是單機限流。

如果是集群限流,判斷身份是限流客戶端還是限流服務端。客戶端則和服務端建立通信,所有的限流都通過和服務端的交互來達到效果。

對於 Sentinel 集群限流,包含內嵌式和獨立式兩種模式。

內嵌式

什麼是內嵌式呢?簡單來說,要限流那麼必然要有個服務端去處理多個客戶端的限流請求。

對於內嵌式來說,整個微服務集群內部選擇一台機器節點作為限流服務端(Sentinel 把這個叫做 token-server),其他的微服務機器節點作為限流的客戶端(token-client),這樣的做法有缺點也有優點。

限流-嵌入式

首先說優點。這種方式部署不需要獨立部署限流服務端,節省獨立部署服務端產生的額外服務器開支,降低部署和維護複雜度。

再說缺點。缺點的話也可以說是整個 Sentinel 在集群限流這方面做得不夠好的問題。

第一個缺點:無自動故障轉移機制。無論是內嵌式還是獨立式的部署方案,都無法做到自動的故障轉移。

所有的 Server 和 Client 都需要事先知道 IP 的請求下做出配置。如果 Server 掛了,需要手動的修改配置,否則集群限流會退化成單機限流。

比如你的交易服務有 3 台機器 A、B、C,其中 A 被手動設置為 Server,B、C 則是作為 Client。當 A 服務器宕機之後,需要手動修改 B、C 中一台作為 Server,否則整個集群的機器都將退化回單機限流的模式。

但是,如果 Client 掛了是不會影響到整個集群限流的。比如 B 掛了,那麼 A 和 C 將會繼續組成集群限流。如果 B 再次重啟成功,那麼又會重新加入到整個集群限流當中來。因為會有一個自動重連的機制,默認的時間是 N * 2 秒,逐漸遞增的一個時間。

這是想用 Sentinel 做集群限流並且使用內嵌式需要考慮的問題,要自己去實現自動故障轉移的機制。當然,Server 節點選舉也要自己實現了。

對於這個問題,官方提供了可以修改 Server、Client 的 API 接口,另外一個就是可以基於動態的數據源配置方式,這個我們後面再談。

第二個缺點:適用於單微服務集群內部限流。

這個其實也是顯而易見的道理。都內部選舉一台作為 Server 去限流了,如果還跨多個微服務的話,顯然是不太合理的行為。現實中這種情況肯定也是非常少見的了,當然你非要想跨多個微服務集群也不是不可以,只要你開心就好。

第三個缺點:Server 節點的機器性能會受到一定程度的影響。

這個肯定也比較好理解的。作為 Server 去限流,那麼其他的客戶端肯定要和 Server 去通信才能做到集群限流啊,對不對?所以,一定程度上肯定會影響到 Server 節點本身服務的性能。但是我覺得問題不大,就當 Server 節點多了一個流量比較大的接口好了。

具體上會有多大的影響,我沒有實際對這塊做出實際的測試。如果真的流量非常大,需要實際測試一下這方面的問題。我認為影響還是可控的,本身Server 和 Client 基於 Netty 通信,通信的數據量其實也非常的小。

獨立式

說完內嵌式的這些優缺點,然後再說獨立式。也非常好理解,就是單獨部署一台機器作為限流服務端 Server,就不在本身微服務集群內部選一台作為 Server了。

限流-獨立式

很明顯,獨立式的優點就是解決了內嵌式的缺點。

不會和內嵌式一樣,影響到 Server 節點的本身性能;
可以適用於跨多個微服務之間的集群限流。

雖然獨立式解決了內嵌式的兩個缺點,但是缺點也來了。這同樣也是 Sentinel 本身並沒有幫助我們去解決的問題。

缺點一:需要獨立部署,會產生額外的資源(錢)和運維複雜度;

缺點二:Server 默認是單機,需要自己實現高可用方案。

缺點二很致命啊,官方的 Server 實現默認就是單機的,單點問題大家懂的都懂,自己實現高可用,我真的是有點服了。

這麼說 Sentinel 這個集群限流就是簡單的實現了一下,真正複雜的部分他都沒管,你可以這麼理解。

2. Run 起來

那基本原理大概了解之後,還是要真正跑起來看看效果的。畢竟開頭我就說了,網上這方面真的是感覺啥也搜不到,下面以嵌入式集群的方式舉例。

無論集群限流還是單機限流的方式,官方都支持寫死配置和動態數據源配置方式,寫的話下面的代碼中也都有,被我注釋掉了。至於動態數據源的配置,會基於 Apollo 來實現。

理解一下動態數據源的配置方式,基於這個我們可以實現限流規則的動態刷新,還有重點的一點可以做到基於修改配置方式的半自動故障轉移。

動態數據源支持推和拉兩種方式。比如文件系統和 Eureka 就是拉取的方式,定時讀取文件內容的變更。Eureka 則是建立 HTTP 連接,定時獲取元數據的變更。

推送的方式主要是基於事件監聽機制,比如 Apollo 和 Nacos。Redis 官方則是基於 Pub/Sub 來實現,默認的實現方式是基於 Lettuce,如果想用其他的客戶端要自己實現。

限流-集群工作模式

首先,該引入的包還是引入。

<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-annotation-aspectj</artifactId> <version>1.8.4</version></dependency><dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-transport-simple-http</artifactId> <version>1.8.4</version></dependency><dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-cluster-client-default</artifactId> <version>1.8.4</version></dependency><dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-cluster-server-default</artifactId> <version>1.8.4</version></dependency><dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-apollo</artifactId> <version>1.8.4</version></dependency>

實現 SPI。在 resources 目錄的 META-INF/services 下新增名為 com.alibaba.csp.sentinel.init.InitFunc 的文件,內容寫上我們自己實現的類名,比如我的 com.irving.demo.init.DemoClusterInitFunc。

實現 InitFunc 接口,重寫 init 方法。代碼直接貼出來,這裡整體依賴的是 Apollo 的配置方式,注釋的部分是我在測試的時候寫死代碼的配置方式,也是可以用的。

public class DemoClusterInitFunc implements InitFunc { private final String namespace = "application"; private final String ruleKey = "demo_sentinel"; private final String ruleServerKey = "demo_cluster"; private final String defaultRuleValue = "[]"; @Override public void init() throws Exception { // 初始化 限流規則 initDynamicRuleProperty(); //初始化 客戶端配置 initClientConfigProperty(); // 初始化 服務端配置信息 initClientServerAssignProperty(); registerClusterRuleSupplier(); // token-server的傳輸規則 initServerTransportConfigProperty(); // 初始化 客戶端和服務端狀態 initStateProperty(); } /** * 限流規則和熱點限流規則配置 */ private void initDynamicRuleProperty() { ReadableDataSource<String, List<FlowRule>> ruleSource = new ApolloDataSource<>(namespace, ruleKey, defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() { })); FlowRuleManager.register2Property(ruleSource.getProperty()); ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new ApolloDataSource<>(namespace, ruleKey, defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() { })); ParamFlowRuleManager.register2Property(paramRuleSource.getProperty()); } /** * 客戶端配置,注釋的部分是通過Apollo配置,只有一個配置我就省略了 */ private void initClientConfigProperty() { // ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(namespace, ruleKey, // defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() { // })); // ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty()); ClusterClientConfig clientConfig = new ClusterClientConfig(); clientConfig.setRequestTimeout(1000); ClusterClientConfigManager.applyNewConfig(clientConfig); } /** * client->server 傳輸配置,設置端口號,注釋的部分是寫死的配置方式 */ private void initServerTransportConfigProperty() { ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey, defaultRuleValue, source -> { List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() { }); ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList) .flatMap(this::extractServerTransportConfig) .orElse(null); return serverTransportConfig; }); ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty()); // ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort)); } private void registerClusterRuleSupplier() { ClusterFlowRuleManager.setPropertySupplier(namespace -> { ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey, defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() { })); return ds.getProperty(); }); ClusterParamFlowRuleManager.setPropertySupplier(namespace -> { ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey, defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() { })); return ds.getProperty(); }); } /** * 服務端配置,設置server端口和IP,注釋的配置是寫死的方式,這個在服務端是不用配置的,只有客戶端需要配置用來連接服務端 */ private void initClientServerAssignProperty() { ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey, defaultRuleValue, source -> { List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() { }); ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList) .flatMap(this::extractClientAssignment) .orElse(null); return clusterClientAssignConfig; }); ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty()); // ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig(); // serverConfig.setServerHost("127.0.0.1"); //serverConfig.setServerPort(transPort); // ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center"); // ClusterClientConfigManager.applyNewAssignConfig(serverConfig); } private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) { ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get(); Integer currentMachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED); if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) { String ip = tokenServer.getIp(); Integer port = tokenServer.getPort(); return Optional.of(new ClusterClientAssignConfig(ip, port)); } return Optional.empty(); } /** * 初始化客戶端和服務端狀態,注釋的也是寫死的配置方式 */ private void initStateProperty() { ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey, defaultRuleValue, source -> { List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() { }); Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED); return state; });ClusterStateManager.registerProperty(clusterModeDs.getProperty()); // ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER); } private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) { return groupList.stream() .filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER)) .findAny() .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600)); } private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) { return getCurrentMachineId().equals(group.getMachineId()); } private String getCurrentMachineId() { // 通過-Dcsp.sentinel.api.port=8719 配置, 默認8719,隨後遞增 return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort(); } private static final String SEPARATOR = "@";}

基礎類用來定義配置的基礎信息。

@Datapublic class ClusterGroupEntity { private String machineId; private String ip; private Integer port; private Integer state;}

然後是 Apollo 中的限流規則的配置和 Server/Client 集群關係的配置。

需要說明一下的就是 flowId,這個是區分限流規則的全局唯一 ID。必須要有,否則集群限流會有問題。

thresholdType 代表限流模式:

默認是 0,代表單機均攤。比如這裡 count 限流QPS=20,有 3 台機器,那麼集群限流閾值就是 60;

如果是 1,代表全局閾值。也就是 count 配置的值就是集群限流的上限。

demo_sentinel=[ { "resource": "test_res", //限流資源名 "count": 20, //集群限流QPS "clusterMode": true, //true為集群限流模式 "clusterConfig": { "flowId": 111, //這個必須得有,否則會有問題 "thresholdType": 1 //限流模式,默認為0單機均攤,1是全局閾值 } }]demo_cluster=[ { "ip": "192.168.3.20", "machineId": "192.168.3.20@8720", "port": 9999, //server和client通信接口 "state": 1 //指定為server }, { "ip": "192.168.3.20", "machineId": "192.168.3.20@8721", "state": 0 }, { "ip": "192.168.3.20", "machineId": "192.168.3.20@8722", "state": 0 }]

到這裡代碼和配置都已經 OK,還需要跑起來 Sentinel 控制台。這個不用教,還有啟動參數。

本地可以直接跑多個客戶端,注意修改端口號:-Dserver.port=9100 -Dcsp.sentinel.api.port=8720 這兩個一塊改。

至於怎麼連 Apollo 這塊我就省略了,自己整吧,公司應該都有。不行的話用代碼里的寫死的方式也可以用。

-Dserver.port=9100-Dcsp.sentinel.api.port=8720-Dcsp.sentinel.dashboard.server=localhost:8080-Dcsp.sentinel.log.use.pid=true
@SpringBootApplicationpublic class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); new FlowQpsDemo(); }}
因為有流量之後控制台才能看到限流的情況,所以用官方給的限流測試代碼修改一下,放到 SpringBoot 啟動類中,觸發限流規則的初始化。

測試限流代碼:

public class FlowQpsDemo { private static final String KEY = "test_res"; private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 32; private static int seconds = 60 + 40; public FlowQpsDemo() { tick(); simulateTraffic(); } private static void simulateTraffic() { for (int i = 0; i < threadCount; i++) { Thread t = new Thread(new RunTask()); t.setName("simulate-traffic-Task"); t.start(); } } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + " send qps is: " + oneSecondTotal); System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock); if (seconds-- <= 0) {// stop = true; } } long cost = System.currentTimeMillis() - start; System.out.println("time cost: " + cost + " ms"); System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get()); System.exit(0); } } static class RunTask implements Runnable { @Override public void run() { while (!stop) { Entry entry = null; try { entry = SphU.entry(KEY); // token acquired, means pass pass.addAndGet(1); } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (entry != null) { entry.exit(); } } Random random2 = new Random(); try { TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); } catch (InterruptedException e) { // ignore } } } }}

啟動之後查看控制台,可以看到嵌入式的集群服務端已經啟動好。

查看限流的情況:

最後為了測試效果,再啟動一個客戶端,修改端口號為 9200 和 8721。可以看到新的客戶端已經連接到了服務端,不過這裡顯示的總 QPS 30000 和我們配置的不符,這個不用管它。

好了,這個就是集群限流原理和使用配置方式。當然了,你可以啟動多台服務,然後手動修改 Apollo 中的 state 參數修改服務端,驗證修改配置的方式是否能實現故障轉移機制。另外就是關閉 Client 或者 Server 驗證是否回退到單機限流的情況,這裡就不一一測試了,因為我已經測試過了。

對於獨立式的部署方式基本也是一樣的,只是單獨啟動一個服務端的服務,需要手動配置 Server。嵌入式的則不需要,loadServerNamespaceSet 配置為自己的服務名稱即可。

ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(11111));ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME));tokenServer.start();

- EOF -

推薦閱讀點擊標題可跳轉

1、Sentinel 萬字教程

2、基於Redis和Lua的分布式限流

3、13 張圖讓你學會 Kafka 分區副本同步限流機制

看完本文有收穫?請轉發分享給更多人

關注「ImportNew」,提升Java技能

點讚和在看就是最大的支持❤️

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()