close

點擊關注公眾號,實用技術文章及時了解


Dubbo 是一款優秀的微服務框架,它以其高性能、簡單易用、易擴展等特點,廣泛應用於互聯網、金融保險、科技公司、製造業、零售物流等多個領域。如今,Dubbo 框架已經成了互聯網開發中比較常用的技術框架。

在Dubbo框架中,當客戶端調用服務端的時候,請求抵達了服務端之後,會有專門的線程池去接收參數並且處理。所以如果要實現Dubbo的線程池監控,就需要先了解下Dubbo底層對於業務線程池的實現原理。

Dubbo底層對於線程池的查看

這裡我所使用的框架是 Dubbo 2.7.8 版本,它在底層對於線程池的管理是通過一個叫做ExecutorRepository 的類處理的,這個類負責創建並管理 Dubbo 中的線程池,通過該擴展接口,我們可以獲取到Dubbo再實際運行中的業務線程池對象。

具體的處理邏輯部分如下所示:

packageorg.idea.dubbo.monitor.core.collect;importorg.apache.dubbo.common.extension.ExtensionLoader;importorg.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository;importorg.apache.dubbo.common.threadpool.manager.ExecutorRepository;importjava.lang.reflect.Field;importjava.util.concurrent.ConcurrentMap;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.ThreadPoolExecutor;/***@Authoridea*@Datecreatedin7:04下午2022/6/29*/publicclassDubboThreadPoolCollector{/***獲取Dubbo的線程池*@return*/publicstaticThreadPoolExecutorgetDubboThreadPoolInfo(){//dubbo線程池數量監控try{ExtensionLoader<ExecutorRepository>executorRepositoryExtensionLoader=ExtensionLoader.getExtensionLoader(ExecutorRepository.class);DefaultExecutorRepositorydefaultExecutorRepository=(DefaultExecutorRepository)executorRepositoryExtensionLoader.getDefaultExtension();FielddataField=defaultExecutorRepository.getClass().getDeclaredField("data");dataField.setAccessible(true);ConcurrentMap<String,ConcurrentMap<Integer,ExecutorService>>data=(ConcurrentMap<String,ConcurrentMap<Integer,ExecutorService>>)dataField.get(defaultExecutorRepository);ConcurrentMap<Integer,ExecutorService>executorServiceConcurrentMap=data.get("java.util.concurrent.ExecutorService");//獲取到默認的線程池模型ThreadPoolExecutorthreadPoolExecutor=(ThreadPoolExecutor)executorServiceConcurrentMap.get(9090);returnthreadPoolExecutor;}catch(Exceptione){e.printStackTrace();}returnnull;}}

好了,現在我們知道如何在代碼中實時查看Dubbo線程池的信息了,那麼接下來要做的就是如何採集這些線程池的數據,並且進行上報,最後將上報存儲的數據通過統計圖的方式展示出來。

下邊我們按照採集,上報,展示三個環節來展示數據。

採集數據

在採集數據這塊,有兩種思路去採集,分別如下:

後台開啟一個定時任務,然後每秒都查詢一下線程池的參數信息。
每次有請求抵達provider的時候,就查看一些線程池的參數信息。

採用兩種不同的模式採集出來的數據,可能會有些差異,下邊是兩種方式的比對:

統計方式實現難度可能存在的問題定時任務採集數據簡單定時任務執行間隙中的數據無法採集,導致數據失真。請求抵達是採集數據稍為複雜一些在每次請求的時候都需要採集數據,會對性能有一定損耗。

通過對實際的業務場景分析,其實第二種方式對應用的性能損耗極微,甚至可以忽略,所以使用這種方式去採集數據的話會比較合適。

下邊讓我們一起來看看這種方式採集數據的話,該如何實現。

首先我們需要自己定義一個filter過濾器:

packageorg.idea.dubbo.monitor.core.filter;importorg.apache.dubbo.common.constants.CommonConstants;importorg.apache.dubbo.common.extension.Activate;importorg.apache.dubbo.rpc.*;importorg.idea.dubbo.monitor.core.DubboMonitorHandler;importjava.util.concurrent.ThreadPoolExecutor;importstaticorg.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;/***@Authoridea*@Datecreatedin2:33下午2022/7/1*/@Activate(group=CommonConstants.PROVIDER)publicclassDubboRecordFilterimplementsFilter{@OverridepublicResultinvoke(Invoker<?>invoker,Invocationinvocation)throwsRpcException{ThreadPoolExecutorthreadPoolExecutor=DubboMonitorHandler.getDubboThreadPoolInfo();//請求的時候趣統計線程池,當請求量太小的時候,這塊的數據可能不準確,但是如果請求量大的話,就接近準確了DUBBO_INFO_STORE_CENTER.reportInfo(9090,threadPoolExecutor.getActiveCount(),threadPoolExecutor.getQueue().size());returninvoker.invoke(invocation);}}

關於DUBBO_INFO_STORE_CENTER的代碼如下所示:

並且在dubbo的spi配置文件中指定好它們:

dubboRecordFilter=org.idea.dubbo.monitor.core.filter.DubboRecordFilter

當provider加入了這個過濾器以後,若有請求抵達服務端,則會通過這個filter觸發採集操作。

packageorg.idea.dubbo.monitor.core.collect;importorg.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/***Dubbo數據存儲中心**@Authoridea*@Datecreatedin11:15上午2022/7/1*/publicclassDubboInfoStoreCenter{privatestaticMap<Integer,DubboInfoStoreBO>dubboInfoStoreBOMap=newConcurrentHashMap<>();publicvoidreportInfo(Integerport,IntegercorePoolSize,IntegerqueueLength){synchronized(this){DubboInfoStoreBOdubboInfoStoreBO=dubboInfoStoreBOMap.get(port);if(dubboInfoStoreBO!=null){booleanhasChange=false;intcurrentMaxPoolSize=dubboInfoStoreBO.getMaxCorePoolSize();intcurrentMaxQueueLength=dubboInfoStoreBO.getMaxCorePoolSize();if(corePoolSize>currentMaxPoolSize){dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);hasChange=true;}if(queueLength>currentMaxQueueLength){dubboInfoStoreBO.setMaxQueueLength(queueLength);hasChange=true;}if(hasChange){dubboInfoStoreBOMap.put(port,dubboInfoStoreBO);}}else{dubboInfoStoreBO=newDubboInfoStoreBO();dubboInfoStoreBO.setMaxQueueLength(queueLength);dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);dubboInfoStoreBOMap.put(port,dubboInfoStoreBO);}}}publicDubboInfoStoreBOgetInfo(Integerport){returndubboInfoStoreBOMap.get(port);}publicvoidcleanInfo(Integerport){dubboInfoStoreBOMap.remove(port);}}

注意這個採集類只會採集一段時間的數據,然後定期會清空重置。

之所以這麼做,是希望用這個map統計指定時間內的最大線程數和最大隊列數,接着當這些峰值數據被上報到存儲中心後就進行清空。

關於DubboInfoStoreCenter對象的定義,我將它放置在了一個叫做CommonCache的類裡面,具體如下:

packageorg.idea.dubbo.monitor.core.config;importorg.idea.dubbo.monitor.core.store.DubboInfoStoreCenter;/***@Authoridea*@Datecreatedin12:15下午2022/7/1*/publicclassCommonCache{publicstaticDubboInfoStoreCenterDUBBO_INFO_STORE_CENTER=newDubboInfoStoreCenter();}

所以在上邊的過濾器中,我們才可以直接通過靜態類引用去調用它的採集接口。

好了,現在整體來看,我們已經實現了在過濾器中去實時採集線程池的數據,並且將它暫存在了一個Map表中,這個map的數據主要是記錄了某段時間內的線程池峰值,供採集器角色去使用。

那麼接下來,我們就來看看上報器模塊主要做了哪些操作。

上報數據

上報數據前,最重要的就是選擇合適的存儲組件了。首先上報的數據本身體量並不大,我們可以將採集時間短設置為15秒,那麼設計一個上報任務,每隔15秒採集一次dubbo線程池的數據。那麼一天的時間就需上報5760次,假設一次上報存儲一條記錄的話,那麼一天下來所需要存儲的數據也並不是特別多。

並且存儲下來的服務數據實際上也並不需要保留太長的時間,一般存儲個一周時間也就足夠了,所以最終我選用啦Redis進行這方面的存儲。

我們實際每次關注的數據字段主要有三個,關於它們的定義我整理成了下邊這個對象:

packageorg.idea.dubbo.monitor.core.bo;/***@Authoridea*@Datecreatedin7:17下午2022/6/29*/publicclassThreadInfoBO{privateIntegeractivePoolSize;privateIntegerqueueLength;privatelongsaveTime;publicIntegergetActivePoolSize(){returnactivePoolSize;}publicvoidsetActivePoolSize(IntegeractivePoolSize){this.activePoolSize=activePoolSize;}publicIntegergetQueueLength(){returnqueueLength;}publicvoidsetQueueLength(IntegerqueueLength){this.queueLength=queueLength;}publiclonggetSaveTime(){returnsaveTime;}publicvoidsetSaveTime(longsaveTime){this.saveTime=saveTime;}@OverridepublicStringtoString(){return"ThreadInfoBO{"+",queueLength="+queueLength+",saveTime="+saveTime+'}';}}

接着會開啟一個線程任務,每間隔15秒就會執行一輪上報數據的動作:

packageorg.idea.dubbo.monitor.core.report;importcom.alibaba.fastjson.JSON;importorg.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;importorg.idea.dubbo.monitor.core.bo.ThreadInfoBO;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.CommandLineRunner;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importstaticorg.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;/***@Authoridea*@Datecreatedin12:13下午2022/7/1*/publicclassDubboInfoReportHandlerimplementsCommandLineRunner{@AutowiredprivateIReportTemplatereportTemplate;privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(DubboInfoReportHandler.class);publicstaticExecutorServiceexecutorService=Executors.newFixedThreadPool(1);publicstaticintDUBBO_PORT=9090;@Overridepublicvoidrun(String...args)throwsException{executorService.submit(newRunnable(){@Overridepublicvoidrun(){while(true){try{Thread.sleep(10000);DubboInfoStoreBOdubboInfoStoreBO=DUBBO_INFO_STORE_CENTER.getInfo(DUBBO_PORT);ThreadInfoBOthreadInfoBO=newThreadInfoBO();threadInfoBO.setSaveTime(System.currentTimeMillis());if(dubboInfoStoreBO!=null){threadInfoBO.setQueueLength(dubboInfoStoreBO.getMaxQueueLength());threadInfoBO.setActivePoolSize(dubboInfoStoreBO.getMaxCorePoolSize());}else{//這種情況可能是對應的時間段內沒有流量請求到provider上threadInfoBO.setQueueLength(0);threadInfoBO.setActivePoolSize(0);}//這裡是上報器上報數據到redis中reportTemplate.reportData(JSON.toJSONString(threadInfoBO));//上報之後,這裡會重置map中的數據DUBBO_INFO_STORE_CENTER.cleanInfo(DUBBO_PORT);LOGGER.info("===========Dubbo線程池數據上報===========");}catch(Exceptione){e.printStackTrace();}}}});}}

這類要注意下,Dubbo應用的線程池上報任務應當等整個SpringBoot應用啟動成功之後再去觸發,否則可能會有些許數據不準確性。所以再定義Bean初始化線程的時候,我選擇了CommandLineRunner接口。

細心查看代碼的你可能會看到這麼一個類:

org.idea.dubbo.monitor.core.report.IReportTemplate

這個類定義了數據上報器的基本動作,下邊是它的具體代碼:

packageorg.idea.dubbo.monitor.core.report;/***上報模版**@Authoridea*@Datecreatedin7:10下午2022/6/29*/publicinterfaceIReportTemplate{/***上報數據**@return*/booleanreportData(Stringjson);}

實現類部分如下所示:

packageorg.idea.dubbo.monitor.core.report.impl;importorg.idea.dubbo.monitor.core.report.IReportTemplate;importorg.idea.qiyu.cache.redis.service.IRedisService;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.time.LocalDate;importjava.util.concurrent.TimeUnit;/***@Authoridea*@Datecreatedin7:12下午2022/6/29*/@ComponentpublicclassRedisTemplateImplimplementsIReportTemplate{@ResourceprivateIRedisServiceredisService;privatestaticStringqueueKey="dubbo:threadpool:info:";@OverridepublicbooleanreportData(Stringjson){redisService.lpush(queueKey+LocalDate.now().toString(),json);redisService.expire(queueKey+LocalDate.now().toString(),7,TimeUnit.DAYS);returntrue;}}

這裡面我採用的是list的結構去存儲這些數據指標,設定了一個過期時間為一周,最終存儲到redis之後的格式如下所示:

數據展示

好了,現在我們已經完成了對線程池的監控,最後只需要設計一個管理台,從緩存中提取上報的數據並且進行頁面的展示即可。

實現的邏輯比較簡單,只需要定義好統計圖所需要的數據結構,然後在controller曾返回即可,例如下圖所示:

最終展現出來的效果如下圖:

隨着請求dubbo接口的量發生變化,統計圖可以展示出dubbo線程池的數據變動情況。如果希望統計圖以實時的方式展示數據的話,其實只需要在js中寫一個定時調用的函數即可。

這裡我是使用的是echart插件做的圖表渲染,我選用的是最簡單的統計圖類型,大家也可以根據自己的具體所需在echart的官網上選擇合適的模型進行渲染,下邊這是echart的官網地址:

https://echarts.apache.org/examples/zh/index.html

推薦

Java面試題寶典

技術內卷群,一起來學習!!

PS:因為公眾號平台更改了推送規則,如果不想錯過內容,記得讀完點一下「在看」,加個「星標」,這樣每次新文章推送才會第一時間出現在你的訂閱列表里。點「在看」支持我們吧!

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()